Introduces TextIO.readFiles()

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/513d26c5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/513d26c5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/513d26c5

Branch: refs/heads/master
Commit: 513d26c5e75aed646c68e287e2dfa432c769c042
Parents: 6d4a785
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Thu Aug 31 16:43:22 2017 -0700
Committer: Eugene Kirpichov <ekirpic...@gmail.com>
Committed: Sun Sep 3 16:32:25 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 69 +++++++++++++++-----
 .../org/apache/beam/sdk/io/TextIOReadTest.java  | 25 ++++++-
 2 files changed, 73 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/513d26c5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index a17928e..ed5db36 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.io.FileIO.ReadMatches.*;
+import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -64,7 +64,7 @@ import org.joda.time.Duration;
  * <p>To read a {@link PCollection} from one or more text files, use {@code 
TextIO.read()} to
  * instantiate a transform and use {@link TextIO.Read#from(String)} to specify 
the path of the
  * file(s) to be read. Alternatively, if the filenames to be read are 
themselves in a {@link
- * PCollection}, apply {@link TextIO#readAll()}.
+ * PCollection}, apply {@link TextIO#readAll()} or {@link TextIO#readFiles}.
  *
  * <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, 
each corresponding to
  * one line of an input UTF-8 text file (split into lines delimited by '\n', 
'\r', or '\r\n',
@@ -211,11 +211,20 @@ public class TextIO {
   public static ReadAll readAll() {
     return new AutoValue_TextIO_ReadAll.Builder()
         .setCompression(Compression.AUTO)
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each file in a {@link PCollection} of {@link
+   * FileIO.ReadableFile}, returned by {@link FileIO#readMatches}.
+   */
+  public static ReadFiles readFiles() {
+    return new AutoValue_TextIO_ReadFiles.Builder()
         // 64MB is a reasonable value that allows to amortize the cost of 
opening files,
         // but is not so large as to exhaust a typical runner's maximum amount 
of output per
         // ProcessElement call.
         .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
-        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
         .build();
   }
 
@@ -422,9 +431,7 @@ public class TextIO {
       extends PTransform<PCollection<String>, PCollection<String>> {
     abstract MatchConfiguration getMatchConfiguration();
     abstract Compression getCompression();
-    abstract long getDesiredBundleSizeBytes();
-    @Nullable
-    abstract byte[] getDelimiter();
+    @Nullable abstract byte[] getDelimiter();
 
     abstract Builder toBuilder();
 
@@ -432,7 +439,6 @@ public class TextIO {
     abstract static class Builder {
       abstract Builder setMatchConfiguration(MatchConfiguration 
matchConfiguration);
       abstract Builder setCompression(Compression compression);
-      abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
       abstract Builder setDelimiter(byte[] delimiter);
       abstract ReadAll build();
     }
@@ -470,11 +476,6 @@ public class TextIO {
           getMatchConfiguration().continuously(pollInterval, 
terminationCondition));
     }
 
-    @VisibleForTesting
-    ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
-      return 
toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
-    }
-
     ReadAll withDelimiter(byte[] delimiter) {
       return toBuilder().setDelimiter(delimiter).build();
     }
@@ -487,12 +488,7 @@ public class TextIO {
               FileIO.readMatches()
                   .withCompression(getCompression())
                   .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply(
-              "Read all via FileBasedSource",
-              new ReadAllViaFileBasedSource<>(
-                  getDesiredBundleSizeBytes(),
-                  new CreateTextSourceFn(getDelimiter()),
-                  StringUtf8Coder.of()));
+          .apply(readFiles().withDelimiter(getDelimiter()));
     }
 
     @Override
@@ -509,6 +505,43 @@ public class TextIO {
           .include("matchConfiguration", getMatchConfiguration());
     }
 
+  }
+
+  /** Implementation of {@link #readFiles}. */
+  @AutoValue
+  public abstract static class ReadFiles
+      extends PTransform<PCollection<FileIO.ReadableFile>, 
PCollection<String>> {
+    abstract long getDesiredBundleSizeBytes();
+    @Nullable abstract byte[] getDelimiter();
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+      abstract Builder setDelimiter(byte[] delimiter);
+      abstract ReadFiles build();
+    }
+
+    @VisibleForTesting
+    ReadFiles withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return 
toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    /** Like {@link Read#withDelimiter}. */
+    public ReadFiles withDelimiter(byte[] delimiter) {
+      return toBuilder().setDelimiter(delimiter).build();
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<FileIO.ReadableFile> input) {
+      return input.apply(
+          "Read all via FileBasedSource",
+          new ReadAllViaFileBasedSource<>(
+              getDesiredBundleSizeBytes(),
+              new CreateTextSourceFn(getDelimiter()),
+              StringUtf8Coder.of()));
+    }
+
     private static class CreateTextSourceFn
         implements SerializableFunction<String, FileBasedSource<String>> {
       private byte[] delimiter;

http://git-wip-us.apache.org/repos/asf/beam/blob/513d26c5/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index e708b46..3a8757e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -380,7 +380,7 @@ public class TextIOReadTest {
         .containsInAnyOrder(expected);
 
     TextIO.ReadAll readAll =
-        
TextIO.readAll().withCompression(compression).withDesiredBundleSizeBytes(10);
+        TextIO.readAll().withCompression(compression);
     PAssert.that(
             p.apply("Create_" + file + "_" + thisUniquifier, 
Create.of(file.getPath()))
                 .apply("Read_" + compression.toString() + "_" + 
thisUniquifier, readAll))
@@ -926,9 +926,9 @@ public class TextIOReadTest {
   @Category(NeedsRunner.class)
   public void testReadAll() throws IOException {
     writeToFile(TINY, "readAllTiny1.zip", ZIP);
-    writeToFile(TINY, "readAllTiny2.zip", ZIP);
+    writeToFile(TINY, "readAllTiny2.txt", UNCOMPRESSED);
     writeToFile(LARGE, "readAllLarge1.zip", ZIP);
-    writeToFile(LARGE, "readAllLarge2.zip", ZIP);
+    writeToFile(LARGE, "readAllLarge2.txt", UNCOMPRESSED);
     PCollection<String> lines =
         p.apply(
                 Create.of(
@@ -940,6 +940,25 @@ public class TextIOReadTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
+  public void testReadFiles() throws IOException {
+    writeToFile(TINY, "readAllTiny1.zip", ZIP);
+    writeToFile(TINY, "readAllTiny2.txt", UNCOMPRESSED);
+    writeToFile(LARGE, "readAllLarge1.zip", ZIP);
+    writeToFile(LARGE, "readAllLarge2.txt", UNCOMPRESSED);
+    PCollection<String> lines =
+        p.apply(
+                Create.of(
+                    tempFolder.resolve("readAllTiny*").toString(),
+                    tempFolder.resolve("readAllLarge*").toString()))
+            .apply(FileIO.matchAll())
+            .apply(FileIO.readMatches().withCompression(AUTO))
+            .apply(TextIO.readFiles().withDesiredBundleSizeBytes(10));
+    PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, 
LARGE));
+    p.run();
+  }
+
+  @Test
   @Category({NeedsRunner.class, UsesSplittableParDo.class})
   public void testReadWatchForNewFiles() throws IOException, 
InterruptedException {
     final Path basePath = tempFolder.resolve("readWatch");

Reply via email to