Introduces TextIO.readFiles()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/513d26c5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/513d26c5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/513d26c5 Branch: refs/heads/master Commit: 513d26c5e75aed646c68e287e2dfa432c769c042 Parents: 6d4a785 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Thu Aug 31 16:43:22 2017 -0700 Committer: Eugene Kirpichov <ekirpic...@gmail.com> Committed: Sun Sep 3 16:32:25 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/TextIO.java | 69 +++++++++++++++----- .../org/apache/beam/sdk/io/TextIOReadTest.java | 25 ++++++- 2 files changed, 73 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/513d26c5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index a17928e..ed5db36 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static org.apache.beam.sdk.io.FileIO.ReadMatches.*; +import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -64,7 +64,7 @@ import org.joda.time.Duration; * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the * file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link - * PCollection}, apply {@link TextIO#readAll()}. + * PCollection}, apply {@link TextIO#readAll()} or {@link TextIO#readFiles}. * * <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to * one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n', @@ -211,11 +211,20 @@ public class TextIO { public static ReadAll readAll() { return new AutoValue_TextIO_ReadAll.Builder() .setCompression(Compression.AUTO) + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) + .build(); + } + + /** + * Like {@link #read}, but reads each file in a {@link PCollection} of {@link + * FileIO.ReadableFile}, returned by {@link FileIO#readMatches}. + */ + public static ReadFiles readFiles() { + return new AutoValue_TextIO_ReadFiles.Builder() // 64MB is a reasonable value that allows to amortize the cost of opening files, // but is not so large as to exhaust a typical runner's maximum amount of output per // ProcessElement call. .setDesiredBundleSizeBytes(64 * 1024 * 1024L) - .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) .build(); } @@ -422,9 +431,7 @@ public class TextIO { extends PTransform<PCollection<String>, PCollection<String>> { abstract MatchConfiguration getMatchConfiguration(); abstract Compression getCompression(); - abstract long getDesiredBundleSizeBytes(); - @Nullable - abstract byte[] getDelimiter(); + @Nullable abstract byte[] getDelimiter(); abstract Builder toBuilder(); @@ -432,7 +439,6 @@ public class TextIO { abstract static class Builder { abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration); abstract Builder setCompression(Compression compression); - abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes); abstract Builder setDelimiter(byte[] delimiter); abstract ReadAll build(); } @@ -470,11 +476,6 @@ public class TextIO { getMatchConfiguration().continuously(pollInterval, terminationCondition)); } - @VisibleForTesting - ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { - return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); - } - ReadAll withDelimiter(byte[] delimiter) { return toBuilder().setDelimiter(delimiter).build(); } @@ -487,12 +488,7 @@ public class TextIO { FileIO.readMatches() .withCompression(getCompression()) .withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) - .apply( - "Read all via FileBasedSource", - new ReadAllViaFileBasedSource<>( - getDesiredBundleSizeBytes(), - new CreateTextSourceFn(getDelimiter()), - StringUtf8Coder.of())); + .apply(readFiles().withDelimiter(getDelimiter())); } @Override @@ -509,6 +505,43 @@ public class TextIO { .include("matchConfiguration", getMatchConfiguration()); } + } + + /** Implementation of {@link #readFiles}. */ + @AutoValue + public abstract static class ReadFiles + extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<String>> { + abstract long getDesiredBundleSizeBytes(); + @Nullable abstract byte[] getDelimiter(); + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes); + abstract Builder setDelimiter(byte[] delimiter); + abstract ReadFiles build(); + } + + @VisibleForTesting + ReadFiles withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { + return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); + } + + /** Like {@link Read#withDelimiter}. */ + public ReadFiles withDelimiter(byte[] delimiter) { + return toBuilder().setDelimiter(delimiter).build(); + } + + @Override + public PCollection<String> expand(PCollection<FileIO.ReadableFile> input) { + return input.apply( + "Read all via FileBasedSource", + new ReadAllViaFileBasedSource<>( + getDesiredBundleSizeBytes(), + new CreateTextSourceFn(getDelimiter()), + StringUtf8Coder.of())); + } + private static class CreateTextSourceFn implements SerializableFunction<String, FileBasedSource<String>> { private byte[] delimiter; http://git-wip-us.apache.org/repos/asf/beam/blob/513d26c5/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index e708b46..3a8757e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -380,7 +380,7 @@ public class TextIOReadTest { .containsInAnyOrder(expected); TextIO.ReadAll readAll = - TextIO.readAll().withCompression(compression).withDesiredBundleSizeBytes(10); + TextIO.readAll().withCompression(compression); PAssert.that( p.apply("Create_" + file + "_" + thisUniquifier, Create.of(file.getPath())) .apply("Read_" + compression.toString() + "_" + thisUniquifier, readAll)) @@ -926,9 +926,9 @@ public class TextIOReadTest { @Category(NeedsRunner.class) public void testReadAll() throws IOException { writeToFile(TINY, "readAllTiny1.zip", ZIP); - writeToFile(TINY, "readAllTiny2.zip", ZIP); + writeToFile(TINY, "readAllTiny2.txt", UNCOMPRESSED); writeToFile(LARGE, "readAllLarge1.zip", ZIP); - writeToFile(LARGE, "readAllLarge2.zip", ZIP); + writeToFile(LARGE, "readAllLarge2.txt", UNCOMPRESSED); PCollection<String> lines = p.apply( Create.of( @@ -940,6 +940,25 @@ public class TextIOReadTest { } @Test + @Category(NeedsRunner.class) + public void testReadFiles() throws IOException { + writeToFile(TINY, "readAllTiny1.zip", ZIP); + writeToFile(TINY, "readAllTiny2.txt", UNCOMPRESSED); + writeToFile(LARGE, "readAllLarge1.zip", ZIP); + writeToFile(LARGE, "readAllLarge2.txt", UNCOMPRESSED); + PCollection<String> lines = + p.apply( + Create.of( + tempFolder.resolve("readAllTiny*").toString(), + tempFolder.resolve("readAllLarge*").toString())) + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches().withCompression(AUTO)) + .apply(TextIO.readFiles().withDesiredBundleSizeBytes(10)); + PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE)); + p.run(); + } + + @Test @Category({NeedsRunner.class, UsesSplittableParDo.class}) public void testReadWatchForNewFiles() throws IOException, InterruptedException { final Path basePath = tempFolder.resolve("readWatch");