Introduces FileIO.read()

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/910d02fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/910d02fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/910d02fb

Branch: refs/heads/master
Commit: 910d02fb464d8a91c7149f214419964cb834639b
Parents: 80b9cf9
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Thu Aug 31 16:11:25 2017 -0700
Committer: Eugene Kirpichov <ekirpic...@gmail.com>
Committed: Sun Sep 3 16:32:25 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/CoderRegistry.java   |   6 +
 .../java/org/apache/beam/sdk/io/FileIO.java     | 181 ++++++++++-
 .../apache/beam/sdk/io/ReadableFileCoder.java   |  61 ++++
 .../java/org/apache/beam/sdk/io/FileIOTest.java | 313 +++++++++++++++++++
 4 files changed, 559 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/910d02fb/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index c335bda..012d6de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -43,6 +43,8 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.ReadableFileCoder;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.MetadataCoder;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -119,6 +121,10 @@ public class CoderRegistry {
           CoderProviders.fromStaticMethods(Metadata.class, 
MetadataCoder.class));
       builder.put(ResourceId.class,
           CoderProviders.fromStaticMethods(ResourceId.class, 
ResourceIdCoder.class));
+      builder.put(
+          FileIO.ReadableFile.class,
+          CoderProviders.fromStaticMethods(
+              FileIO.ReadableFile.class, ReadableFileCoder.class));
       builder.put(Set.class,
           CoderProviders.fromStaticMethods(Set.class, SetCoder.class));
       builder.put(String.class,

http://git-wip-us.apache.org/repos/asf/beam/blob/910d02fb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 1eb81df..fcae0f7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -17,13 +17,22 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.auto.value.AutoValue;
+import java.io.IOException;
 import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.charset.StandardCharsets;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -34,6 +43,7 @@ import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.util.StreamUtils;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -43,7 +53,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Transforms for working with files. Currently includes matching of 
filepatterns via {@link #match}
- * and {@link #matchAll}.
+ * and {@link #matchAll}, and reading matches via {@link #readMatches}.
  */
 public class FileIO {
   private static final Logger LOG = LoggerFactory.getLogger(FileIO.class);
@@ -83,6 +93,86 @@ public class FileIO {
   }
 
   /**
+   * Converts each result of {@link #match} or {@link #matchAll} to a {@link 
ReadableFile} which can
+   * be used to read the contents of each file, optionally decompressing it.
+   */
+  public static ReadMatches readMatches() {
+    return new AutoValue_FileIO_ReadMatches.Builder()
+        .setCompression(Compression.AUTO)
+        .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP)
+        .build();
+  }
+
+  /** A utility class for accessing a potentially compressed file. */
+  public final static class ReadableFile {
+    private final ResourceId resourceId;
+    private final long sizeBytes;
+    private final boolean isSeekable;
+    private final Compression compression;
+
+    ReadableFile(
+        ResourceId resourceId,
+        long sizeBytes,
+        boolean isSeekable,
+        Compression compression) {
+      this.resourceId = resourceId;
+      this.sizeBytes = sizeBytes;
+      this.isSeekable = isSeekable;
+      this.compression = compression;
+    }
+
+    /** Returns the {@link ResourceId} of the file. */
+    public ResourceId getResourceId() {
+      return resourceId;
+    }
+
+    /** Returns the size of the file in bytes (before decompression). */
+    public long getSizeBytes() {
+      return sizeBytes;
+    }
+
+    /**
+     * Returns whether or not the channel returned by {@link #open} can be 
efficiently seeked.
+     * If true, then {@link #open} will return a {@link SeekableByteChannel}.
+     */
+    public boolean isSeekable() {
+      return isSeekable;
+    }
+
+    /** Returns the method with which this file will be decompressed in {@link 
#open}. */
+    public Compression getCompression() {
+      return compression;
+    }
+
+    /**
+     * Returns a {@link ReadableByteChannel} reading the data from this file, 
potentially
+     * decompressing it using {@link #getCompression}.
+     */
+    public ReadableByteChannel open() throws IOException {
+      return compression.readDecompressed(FileSystems.open(resourceId));
+    }
+
+    /**
+     * Returns a {@link SeekableByteChannel} equivalent to {@link #open}, but 
fails if this file is
+     * not {@link #isSeekable() seekable}.
+     */
+    public SeekableByteChannel openSeekable() throws IOException {
+      checkState(isSeekable(), "The file %s is not seekable", resourceId);
+      return ((SeekableByteChannel) open());
+    }
+
+    /** Returns the full contents of the file as bytes. */
+    public byte[] readFullyAsBytes() throws IOException {
+      return StreamUtils.getBytes(Channels.newInputStream(open()));
+    }
+
+    /** Returns the full contents of the file as a {@link String} decoded as 
UTF-8. */
+    public String readFullyAsUTF8String() throws IOException {
+      return new String(readFullyAsBytes(), StandardCharsets.UTF_8);
+    }
+  }
+
+  /**
    * Describes configuration for matching filepatterns, such as {@link 
EmptyMatchTreatment}
    * and continuous watching for matching files.
    */
@@ -138,7 +228,7 @@ public class FileIO {
   /** Implementation of {@link #match}. */
   @AutoValue
   public abstract static class Match extends PTransform<PBegin, 
PCollection<MatchResult.Metadata>> {
-    abstract ValueProvider<String> getFilepattern();
+    @Nullable abstract ValueProvider<String> getFilepattern();
     abstract MatchConfiguration getConfiguration();
     abstract Builder toBuilder();
 
@@ -262,4 +352,91 @@ public class FileIO {
       }
     }
   }
+
+  /** Implementation of {@link #readMatches}. */
+  @AutoValue
+  public abstract static class ReadMatches
+      extends PTransform<PCollection<MatchResult.Metadata>, 
PCollection<ReadableFile>> {
+    enum DirectoryTreatment {
+      SKIP,
+      PROHIBIT
+    }
+
+    abstract Compression getCompression();
+    abstract DirectoryTreatment getDirectoryTreatment();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setCompression(Compression compression);
+      abstract Builder setDirectoryTreatment(DirectoryTreatment 
directoryTreatment);
+
+      abstract ReadMatches build();
+    }
+
+    /** Reads files using the given {@link Compression}. Default is {@link 
Compression#AUTO}. */
+    public ReadMatches withCompression(Compression compression) {
+      checkArgument(compression != null, "compression can not be null");
+      return toBuilder().setCompression(compression).build();
+    }
+
+    /**
+     * Controls how to handle directories in the input {@link PCollection}. 
Default is {@link
+     * DirectoryTreatment#SKIP}.
+     */
+    public ReadMatches withDirectoryTreatment(DirectoryTreatment 
directoryTreatment) {
+      checkArgument(directoryTreatment != null, "directoryTreatment can not be 
null");
+      return toBuilder().setDirectoryTreatment(directoryTreatment).build();
+    }
+
+    @Override
+    public PCollection<ReadableFile> expand(PCollection<MatchResult.Metadata> 
input) {
+      return input.apply(ParDo.of(new ToReadableFileFn(this)));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add(DisplayData.item("compression", 
getCompression().toString()));
+      builder.add(DisplayData.item("directoryTreatment", 
getDirectoryTreatment().toString()));
+    }
+
+    private static class ToReadableFileFn extends DoFn<MatchResult.Metadata, 
ReadableFile> {
+      private final ReadMatches spec;
+
+      private ToReadableFileFn(ReadMatches spec) {
+        this.spec = spec;
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) {
+        MatchResult.Metadata metadata = c.element();
+        if (metadata.resourceId().isDirectory()) {
+          switch(spec.getDirectoryTreatment()) {
+            case SKIP:
+              return;
+
+            case PROHIBIT:
+              throw new IllegalArgumentException(
+                  "Trying to read " + metadata.resourceId() + " which is a 
directory");
+
+            default:
+              throw new UnsupportedOperationException(
+                  "Unknown DirectoryTreatment: " + 
spec.getDirectoryTreatment());
+          }
+        }
+
+        Compression compression =
+            (spec.getCompression() == Compression.AUTO)
+                ? Compression.detect(metadata.resourceId().getFilename())
+                : spec.getCompression();
+        c.output(
+            new ReadableFile(
+                metadata.resourceId(),
+                metadata.sizeBytes(),
+                metadata.isReadSeekEfficient() && compression == 
Compression.UNCOMPRESSED,
+                compression));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/910d02fb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java
new file mode 100644
index 0000000..4ef069c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.fs.ResourceIdCoder;
+
+/** A {@link Coder} for {@link FileIO.ReadableFile}. */
+public class ReadableFileCoder extends AtomicCoder<FileIO.ReadableFile> {
+  private static final ReadableFileCoder INSTANCE = new ReadableFileCoder();
+
+  private static final BooleanCoder IS_SEEKABLE_CODER = BooleanCoder.of();
+  private static final VarIntCoder COMPRESSION_CODER = VarIntCoder.of();
+  private static final ResourceIdCoder RESOURCE_ID_CODER = 
ResourceIdCoder.of();
+  private static final VarLongCoder SIZE_CODER = VarLongCoder.of();
+
+  /** Returns the instance of {@link ReadableFileCoder}. */
+  public static ReadableFileCoder of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void encode(FileIO.ReadableFile value, OutputStream os) throws 
IOException {
+    RESOURCE_ID_CODER.encode(value.getResourceId(), os);
+    SIZE_CODER.encode(value.getSizeBytes(), os);
+    IS_SEEKABLE_CODER.encode(value.isSeekable(), os);
+    COMPRESSION_CODER.encode(value.getCompression().ordinal(), os);
+  }
+
+  @Override
+  public FileIO.ReadableFile decode(InputStream is) throws IOException {
+    ResourceId resourceId = RESOURCE_ID_CODER.decode(is);
+    long sizeBytes = SIZE_CODER.decode(is);
+    boolean isSeekable = IS_SEEKABLE_CODER.decode(is);
+    Compression compression = 
Compression.values()[COMPRESSION_CODER.decode(is)];
+    return new FileIO.ReadableFile(resourceId, sizeBytes, isSeekable, 
compression);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/910d02fb/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
new file mode 100644
index 0000000..341d86a
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.zip.GZIPOutputStream;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesSplittableParDo;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FileIO}. */
+@RunWith(JUnit4.class)
+public class FileIOTest implements Serializable {
+  @Rule public transient TestPipeline p = TestPipeline.create();
+
+  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMatchAndMatchAll() throws IOException {
+    Path firstPath = tmpFolder.newFile("first").toPath();
+    Path secondPath = tmpFolder.newFile("second").toPath();
+    int firstSize = 37;
+    int secondSize = 42;
+    Files.write(firstPath, new byte[firstSize]);
+    Files.write(secondPath, new byte[secondSize]);
+
+    PAssert.that(
+            p.apply(
+                "Match existing",
+                
FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*")))
+        .containsInAnyOrder(metadata(firstPath, firstSize), 
metadata(secondPath, secondSize));
+    PAssert.that(
+            p.apply(
+                "Match existing with provider",
+                FileIO.match()
+                    
.filepattern(p.newProvider(tmpFolder.getRoot().getAbsolutePath() + "/*"))))
+        .containsInAnyOrder(metadata(firstPath, firstSize), 
metadata(secondPath, secondSize));
+    PAssert.that(
+            p.apply("Create existing", 
Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*"))
+                .apply("MatchAll existing", FileIO.matchAll()))
+        .containsInAnyOrder(metadata(firstPath, firstSize), 
metadata(secondPath, secondSize));
+
+    PAssert.that(
+            p.apply(
+                "Match non-existing ALLOW",
+                FileIO.match()
+                    .filepattern(tmpFolder.getRoot().getAbsolutePath() + 
"/blah")
+                    .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)))
+        .containsInAnyOrder();
+    PAssert.that(
+            p.apply(
+                    "Create non-existing",
+                    Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah"))
+                .apply(
+                    "MatchAll non-existing ALLOW",
+                    
FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)))
+        .containsInAnyOrder();
+
+    PAssert.that(
+            p.apply(
+                "Match non-existing ALLOW_IF_WILDCARD",
+                FileIO.match()
+                    .filepattern(tmpFolder.getRoot().getAbsolutePath() + 
"/blah*")
+                    
.withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)))
+        .containsInAnyOrder();
+    PAssert.that(
+            p.apply(
+                    "Create non-existing wildcard + explicit",
+                    Create.of(tmpFolder.getRoot().getAbsolutePath() + 
"/blah*"))
+                .apply(
+                    "MatchAll non-existing ALLOW_IF_WILDCARD",
+                    FileIO.matchAll()
+                        
.withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)))
+        .containsInAnyOrder();
+    PAssert.that(
+            p.apply(
+                    "Create non-existing wildcard + default",
+                    Create.of(tmpFolder.getRoot().getAbsolutePath() + 
"/blah*"))
+                .apply("MatchAll non-existing default", FileIO.matchAll()))
+        .containsInAnyOrder();
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMatchDisallowEmptyDefault() throws IOException {
+    p.apply("Match", 
FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*"));
+
+    thrown.expectCause(isA(FileNotFoundException.class));
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMatchDisallowEmptyExplicit() throws IOException {
+    p.apply(
+        FileIO.match()
+            .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*")
+            .withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW));
+
+    thrown.expectCause(isA(FileNotFoundException.class));
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMatchDisallowEmptyNonWildcard() throws IOException {
+    p.apply(
+        FileIO.match()
+            .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah")
+            .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD));
+
+    thrown.expectCause(isA(FileNotFoundException.class));
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMatchAllDisallowEmptyExplicit() throws IOException {
+    p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*"))
+        
.apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW));
+    thrown.expectCause(isA(FileNotFoundException.class));
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMatchAllDisallowEmptyNonWildcard() throws IOException {
+    p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah"))
+        
.apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD));
+    thrown.expectCause(isA(FileNotFoundException.class));
+    p.run();
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesSplittableParDo.class})
+  public void testMatchWatchForNewFiles() throws IOException, 
InterruptedException {
+    final Path basePath = tmpFolder.getRoot().toPath().resolve("watch");
+    basePath.toFile().mkdir();
+    PCollection<MatchResult.Metadata> matchMetadata =
+        p.apply(
+            FileIO.match()
+                .filepattern(basePath.resolve("*").toString())
+                .continuously(
+                    Duration.millis(100),
+                    
Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3))));
+    PCollection<MatchResult.Metadata> matchAllMetadata =
+        p.apply(Create.of(basePath.resolve("*").toString()))
+            .apply(
+                FileIO.matchAll()
+                    .continuously(
+                        Duration.millis(100),
+                        
Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3))));
+
+    Thread writer =
+        new Thread() {
+          @Override
+          public void run() {
+            try {
+              Thread.sleep(1000);
+              Files.write(basePath.resolve("first"), new byte[42]);
+              Thread.sleep(300);
+              Files.write(basePath.resolve("second"), new byte[37]);
+              Thread.sleep(300);
+              Files.write(basePath.resolve("third"), new byte[99]);
+            } catch (IOException | InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+    writer.start();
+
+    List<MatchResult.Metadata> expected =
+        Arrays.asList(
+            metadata(basePath.resolve("first"), 42),
+            metadata(basePath.resolve("second"), 37),
+            metadata(basePath.resolve("third"), 99));
+    PAssert.that(matchMetadata).containsInAnyOrder(expected);
+    PAssert.that(matchAllMetadata).containsInAnyOrder(expected);
+    p.run();
+
+    writer.join();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testRead() throws IOException {
+    final String path = tmpFolder.newFile("file").getAbsolutePath();
+    final String pathGZ = tmpFolder.newFile("file.gz").getAbsolutePath();
+    Files.write(new File(path).toPath(), "Hello world".getBytes());
+    try (Writer writer =
+        new OutputStreamWriter(new GZIPOutputStream(new 
FileOutputStream(pathGZ)))) {
+      writer.write("Hello world");
+    }
+
+    PCollection<MatchResult.Metadata> matches = p.apply("Match", 
FileIO.match().filepattern(path));
+    PCollection<FileIO.ReadableFile> decompressedAuto =
+        matches.apply("Read AUTO", 
FileIO.readMatches().withCompression(Compression.AUTO));
+    PCollection<FileIO.ReadableFile> decompressedDefault =
+        matches.apply("Read default", FileIO.readMatches());
+    PCollection<FileIO.ReadableFile> decompressedUncompressed =
+        matches.apply(
+            "Read UNCOMPRESSED", 
FileIO.readMatches().withCompression(Compression.UNCOMPRESSED));
+    for (PCollection<FileIO.ReadableFile> c :
+        Arrays.asList(decompressedAuto, decompressedDefault, 
decompressedUncompressed)) {
+      PAssert.thatSingleton(c)
+          .satisfies(
+              new SerializableFunction<FileIO.ReadableFile, Void>() {
+                @Override
+                public Void apply(FileIO.ReadableFile input) {
+                  assertEquals(path, input.getResourceId().toString());
+                  assertEquals("Hello world".length(), input.getSizeBytes());
+                  assertEquals(Compression.UNCOMPRESSED, 
input.getCompression());
+                  assertTrue(input.isSeekable());
+                  try {
+                    assertEquals("Hello world", input.readFullyAsUTF8String());
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return null;
+                }
+              });
+    }
+
+    PCollection<MatchResult.Metadata> matchesGZ =
+        p.apply("Match GZ", FileIO.match().filepattern(pathGZ));
+    PCollection<FileIO.ReadableFile> compressionAuto =
+        matchesGZ.apply("Read GZ AUTO", 
FileIO.readMatches().withCompression(Compression.AUTO));
+    PCollection<FileIO.ReadableFile> compressionDefault =
+        matchesGZ.apply("Read GZ default", FileIO.readMatches());
+    PCollection<FileIO.ReadableFile> compressionGzip =
+        matchesGZ.apply("Read GZ GZIP", 
FileIO.readMatches().withCompression(Compression.GZIP));
+    for (PCollection<FileIO.ReadableFile> c :
+        Arrays.asList(compressionAuto, compressionDefault, compressionGzip)) {
+      PAssert.thatSingleton(c)
+          .satisfies(
+              new SerializableFunction<FileIO.ReadableFile, Void>() {
+                @Override
+                public Void apply(FileIO.ReadableFile input) {
+                  assertEquals(pathGZ, input.getResourceId().toString());
+                  assertFalse(input.getSizeBytes() == "Hello world".length());
+                  assertEquals(Compression.GZIP, input.getCompression());
+                  assertFalse(input.isSeekable());
+                  try {
+                    assertEquals("Hello world", input.readFullyAsUTF8String());
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return null;
+                }
+              });
+    }
+
+    p.run();
+  }
+
+  private static MatchResult.Metadata metadata(Path path, int size) {
+    return MatchResult.Metadata.builder()
+        .setResourceId(FileSystems.matchNewResource(path.toString(), false /* 
isDirectory */))
+        .setIsReadSeekEfficient(true)
+        .setSizeBytes(size)
+        .build();
+  }
+}

Reply via email to