Uses FileIO.read() in TextIO and AvroIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6d4a7851 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6d4a7851 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6d4a7851 Branch: refs/heads/master Commit: 6d4a78517708db3bd89cfeff5a7e62fb6b948e1d Parents: 910d02f Author: Eugene Kirpichov <kirpic...@google.com> Authored: Thu Aug 31 16:28:07 2017 -0700 Committer: Eugene Kirpichov <ekirpic...@gmail.com> Committed: Sun Sep 3 16:32:25 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 14 ++-- .../java/org/apache/beam/sdk/io/FileIO.java | 82 ++++++++++---------- .../beam/sdk/io/ReadAllViaFileBasedSource.java | 59 +++++++------- .../apache/beam/sdk/io/ReadableFileCoder.java | 25 ++---- .../java/org/apache/beam/sdk/io/TextIO.java | 34 +++----- .../java/org/apache/beam/sdk/io/FileIOTest.java | 12 +-- 6 files changed, 96 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6d4a7851/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index c4711e8..108054f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -597,13 +598,13 @@ public class AvroIO { checkNotNull(getSchema(), "schema"); return input .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration())) + .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) .apply( "Read all via FileBasedSource", new ReadAllViaFileBasedSource<>( - SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */, getDesiredBundleSizeBytes(), - new CreateSourceFn<>(getRecordClass(), getSchema().toString()))) - .setCoder(AvroCoder.of(getRecordClass(), getSchema())); + new CreateSourceFn<>(getRecordClass(), getSchema().toString()), + AvroCoder.of(getRecordClass(), getSchema()))); } @Override @@ -804,13 +805,10 @@ public class AvroIO { new CreateParseSourceFn<>(parseFn, coder); return input .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration())) + .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) .apply( "Parse all via FileBasedSource", - new ReadAllViaFileBasedSource<>( - SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */, - getDesiredBundleSizeBytes(), - createSource)) - .setCoder(coder); + new ReadAllViaFileBasedSource<>(getDesiredBundleSizeBytes(), createSource, coder)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/6d4a7851/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index fcae0f7..c909c3c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -104,39 +103,18 @@ public class FileIO { } /** A utility class for accessing a potentially compressed file. */ - public final static class ReadableFile { - private final ResourceId resourceId; - private final long sizeBytes; - private final boolean isSeekable; + public static final class ReadableFile { + private final MatchResult.Metadata metadata; private final Compression compression; - ReadableFile( - ResourceId resourceId, - long sizeBytes, - boolean isSeekable, - Compression compression) { - this.resourceId = resourceId; - this.sizeBytes = sizeBytes; - this.isSeekable = isSeekable; + ReadableFile(MatchResult.Metadata metadata, Compression compression) { + this.metadata = metadata; this.compression = compression; } - /** Returns the {@link ResourceId} of the file. */ - public ResourceId getResourceId() { - return resourceId; - } - - /** Returns the size of the file in bytes (before decompression). */ - public long getSizeBytes() { - return sizeBytes; - } - - /** - * Returns whether or not the channel returned by {@link #open} can be efficiently seeked. - * If true, then {@link #open} will return a {@link SeekableByteChannel}. - */ - public boolean isSeekable() { - return isSeekable; + /** Returns the {@link MatchResult.Metadata} of the file. */ + public MatchResult.Metadata getMetadata() { + return metadata; } /** Returns the method with which this file will be decompressed in {@link #open}. */ @@ -149,15 +127,18 @@ public class FileIO { * decompressing it using {@link #getCompression}. */ public ReadableByteChannel open() throws IOException { - return compression.readDecompressed(FileSystems.open(resourceId)); + return compression.readDecompressed(FileSystems.open(metadata.resourceId())); } /** * Returns a {@link SeekableByteChannel} equivalent to {@link #open}, but fails if this file is - * not {@link #isSeekable() seekable}. + * not {@link MatchResult.Metadata#isReadSeekEfficient seekable}. */ public SeekableByteChannel openSeekable() throws IOException { - checkState(isSeekable(), "The file %s is not seekable", resourceId); + checkState( + getMetadata().isReadSeekEfficient(), + "The file %s is not seekable", + metadata.resourceId()); return ((SeekableByteChannel) open()); } @@ -173,8 +154,8 @@ public class FileIO { } /** - * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} - * and continuous watching for matching files. + * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} and + * continuous watching for matching files. */ @AutoValue public abstract static class MatchConfiguration implements HasDisplayData, Serializable { @@ -186,16 +167,23 @@ public class FileIO { } abstract EmptyMatchTreatment getEmptyMatchTreatment(); - @Nullable abstract Duration getWatchInterval(); - @Nullable abstract TerminationCondition<String, ?> getWatchTerminationCondition(); + + @Nullable + abstract Duration getWatchInterval(); + + @Nullable + abstract TerminationCondition<String, ?> getWatchTerminationCondition(); abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); + abstract Builder setWatchInterval(Duration watchInterval); + abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition); + abstract MatchConfiguration build(); } @@ -228,14 +216,19 @@ public class FileIO { /** Implementation of {@link #match}. */ @AutoValue public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>> { - @Nullable abstract ValueProvider<String> getFilepattern(); + @Nullable + abstract ValueProvider<String> getFilepattern(); + abstract MatchConfiguration getConfiguration(); + abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { abstract Builder setFilepattern(ValueProvider<String> filepattern); + abstract Builder setConfiguration(MatchConfiguration configuration); + abstract Match build(); } @@ -283,11 +276,13 @@ public class FileIO { public abstract static class MatchAll extends PTransform<PCollection<String>, PCollection<MatchResult.Metadata>> { abstract MatchConfiguration getConfiguration(); + abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { abstract Builder setConfiguration(MatchConfiguration configuration); + abstract MatchAll build(); } @@ -363,6 +358,7 @@ public class FileIO { } abstract Compression getCompression(); + abstract DirectoryTreatment getDirectoryTreatment(); abstract Builder toBuilder(); @@ -370,6 +366,7 @@ public class FileIO { @AutoValue.Builder abstract static class Builder { abstract Builder setCompression(Compression compression); + abstract Builder setDirectoryTreatment(DirectoryTreatment directoryTreatment); abstract ReadMatches build(); @@ -412,7 +409,7 @@ public class FileIO { public void process(ProcessContext c) { MatchResult.Metadata metadata = c.element(); if (metadata.resourceId().isDirectory()) { - switch(spec.getDirectoryTreatment()) { + switch (spec.getDirectoryTreatment()) { case SKIP: return; @@ -432,9 +429,12 @@ public class FileIO { : spec.getCompression(); c.output( new ReadableFile( - metadata.resourceId(), - metadata.sizeBytes(), - metadata.isReadSeekEfficient() && compression == Compression.UNCOMPRESSED, + MatchResult.Metadata.builder() + .setResourceId(metadata.resourceId()) + .setSizeBytes(metadata.sizeBytes()) + .setIsReadSeekEfficient( + metadata.isReadSeekEfficient() && compression == Compression.UNCOMPRESSED) + .build(), compression)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6d4a7851/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java index 0cd7105..03b9b55 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java @@ -17,10 +17,10 @@ */ package org.apache.beam.sdk.io; -import static com.google.common.base.Preconditions.checkArgument; - import java.io.IOException; import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.range.OffsetRange; @@ -38,31 +38,30 @@ import org.apache.beam.sdk.values.PCollection; * splitting files into offset ranges and for creating a {@link FileBasedSource} for a file. The * input {@link PCollection} must not contain {@link ResourceId#isDirectory directories}. * - * <p>To obtain the collection of {@link Metadata} from a filepattern, use {@link - * FileIO#match} or {@link FileIO#matchAll}. + * <p>To obtain the collection of {@link Metadata} from a filepattern, use {@link FileIO#match} or + * {@link FileIO#matchAll}. */ -class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<Metadata>, PCollection<T>> { - private final SerializableFunction<String, Boolean> isSplittable; +class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<ReadableFile>, PCollection<T>> { private final long desiredBundleSizeBytes; private final SerializableFunction<String, FileBasedSource<T>> createSource; + private final Coder<T> coder; public ReadAllViaFileBasedSource( - SerializableFunction<String, Boolean> isSplittable, long desiredBundleSizeBytes, - SerializableFunction<String, FileBasedSource<T>> createSource) { - this.isSplittable = isSplittable; + SerializableFunction<String, FileBasedSource<T>> createSource, + Coder<T> coder) { this.desiredBundleSizeBytes = desiredBundleSizeBytes; this.createSource = createSource; + this.coder = coder; } @Override - public PCollection<T> expand(PCollection<Metadata> input) { + public PCollection<T> expand(PCollection<ReadableFile> input) { return input - .apply( - "Split into ranges", - ParDo.of(new SplitIntoRangesFn(isSplittable, desiredBundleSizeBytes))) - .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, OffsetRange>>()) - .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource))); + .apply("Split into ranges", ParDo.of(new SplitIntoRangesFn(desiredBundleSizeBytes))) + .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<ReadableFile, OffsetRange>>()) + .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource))) + .setCoder(coder); } private static class ReshuffleWithUniqueKey<T> @@ -90,36 +89,28 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<Metadata>, PCo } } - private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, OffsetRange>> { - private final SerializableFunction<String, Boolean> isSplittable; + private static class SplitIntoRangesFn extends DoFn<ReadableFile, KV<ReadableFile, OffsetRange>> { private final long desiredBundleSizeBytes; - private SplitIntoRangesFn( - SerializableFunction<String, Boolean> isSplittable, long desiredBundleSizeBytes) { - this.isSplittable = isSplittable; + private SplitIntoRangesFn(long desiredBundleSizeBytes) { this.desiredBundleSizeBytes = desiredBundleSizeBytes; } @ProcessElement public void process(ProcessContext c) { - Metadata metadata = c.element(); - checkArgument( - !metadata.resourceId().isDirectory(), - "Resource %s is a directory", - metadata.resourceId()); - if (!metadata.isReadSeekEfficient() - || !isSplittable.apply(metadata.resourceId().toString())) { - c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes()))); + Metadata metadata = c.element().getMetadata(); + if (!metadata.isReadSeekEfficient()) { + c.output(KV.of(c.element(), new OffsetRange(0, metadata.sizeBytes()))); return; } for (OffsetRange range : new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSizeBytes, 0)) { - c.output(KV.of(metadata, range)); + c.output(KV.of(c.element(), range)); } } } - private static class ReadFileRangesFn<T> extends DoFn<KV<Metadata, OffsetRange>, T> { + private static class ReadFileRangesFn<T> extends DoFn<KV<ReadableFile, OffsetRange>, T> { private final SerializableFunction<String, FileBasedSource<T>> createSource; private ReadFileRangesFn(SerializableFunction<String, FileBasedSource<T>> createSource) { @@ -128,12 +119,14 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<Metadata>, PCo @ProcessElement public void process(ProcessContext c) throws IOException { - Metadata metadata = c.element().getKey(); + ReadableFile file = c.element().getKey(); OffsetRange range = c.element().getValue(); - FileBasedSource<T> source = createSource.apply(metadata.toString()); + FileBasedSource<T> source = + CompressedSource.from(createSource.apply(file.getMetadata().resourceId().toString())) + .withCompression(file.getCompression()); try (BoundedSource.BoundedReader<T> reader = source - .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo()) + .createForSubrangeOfFile(file.getMetadata(), range.getFrom(), range.getTo()) .createReader(c.getPipelineOptions())) { for (boolean more = reader.start(); more; more = reader.advance()) { c.output(reader.getCurrent()); http://git-wip-us.apache.org/repos/asf/beam/blob/6d4a7851/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java index 4ef069c..51bb83e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java @@ -21,22 +21,15 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.BooleanCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.fs.ResourceIdCoder; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MetadataCoder; /** A {@link Coder} for {@link FileIO.ReadableFile}. */ public class ReadableFileCoder extends AtomicCoder<FileIO.ReadableFile> { private static final ReadableFileCoder INSTANCE = new ReadableFileCoder(); - private static final BooleanCoder IS_SEEKABLE_CODER = BooleanCoder.of(); - private static final VarIntCoder COMPRESSION_CODER = VarIntCoder.of(); - private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of(); - private static final VarLongCoder SIZE_CODER = VarLongCoder.of(); - /** Returns the instance of {@link ReadableFileCoder}. */ public static ReadableFileCoder of() { return INSTANCE; @@ -44,18 +37,14 @@ public class ReadableFileCoder extends AtomicCoder<FileIO.ReadableFile> { @Override public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { - RESOURCE_ID_CODER.encode(value.getResourceId(), os); - SIZE_CODER.encode(value.getSizeBytes(), os); - IS_SEEKABLE_CODER.encode(value.isSeekable(), os); - COMPRESSION_CODER.encode(value.getCompression().ordinal(), os); + MetadataCoder.of().encode(value.getMetadata(), os); + VarIntCoder.of().encode(value.getCompression().ordinal(), os); } @Override public FileIO.ReadableFile decode(InputStream is) throws IOException { - ResourceId resourceId = RESOURCE_ID_CODER.decode(is); - long sizeBytes = SIZE_CODER.decode(is); - boolean isSeekable = IS_SEEKABLE_CODER.decode(is); - Compression compression = Compression.values()[COMPRESSION_CODER.decode(is)]; - return new FileIO.ReadableFile(resourceId, sizeBytes, isSeekable, compression); + MatchResult.Metadata metadata = MetadataCoder.of().decode(is); + Compression compression = Compression.values()[VarIntCoder.of().decode(is)]; + return new FileIO.ReadableFile(metadata, compression); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6d4a7851/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 57bfaa9..a17928e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.sdk.io.FileIO.ReadMatches.*; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -483,12 +484,15 @@ public class TextIO { return input .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration())) .apply( + FileIO.readMatches() + .withCompression(getCompression()) + .withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) + .apply( "Read all via FileBasedSource", new ReadAllViaFileBasedSource<>( - new IsSplittableFn(getCompression()), getDesiredBundleSizeBytes(), - new CreateTextSourceFn(getCompression(), getDelimiter()))) - .setCoder(StringUtf8Coder.of()); + new CreateTextSourceFn(getDelimiter()), + StringUtf8Coder.of())); } @Override @@ -507,34 +511,16 @@ public class TextIO { private static class CreateTextSourceFn implements SerializableFunction<String, FileBasedSource<String>> { - private final Compression compression; private byte[] delimiter; - private CreateTextSourceFn( - Compression compression, byte[] delimiter) { - this.compression = compression; + private CreateTextSourceFn(byte[] delimiter) { this.delimiter = delimiter; } @Override public FileBasedSource<String> apply(String input) { - return CompressedSource.from( - new TextSource( - StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter)) - .withCompression(compression); - } - } - - private static class IsSplittableFn implements SerializableFunction<String, Boolean> { - private final Compression compression; - - private IsSplittableFn(Compression compression) { - this.compression = compression; - } - - @Override - public Boolean apply(String filename) { - return !compression.isCompressed(filename); + return new TextSource( + StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/6d4a7851/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index 341d86a..7065bff 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -257,10 +257,10 @@ public class FileIOTest implements Serializable { new SerializableFunction<FileIO.ReadableFile, Void>() { @Override public Void apply(FileIO.ReadableFile input) { - assertEquals(path, input.getResourceId().toString()); - assertEquals("Hello world".length(), input.getSizeBytes()); + assertEquals(path, input.getMetadata().resourceId().toString()); + assertEquals("Hello world".length(), input.getMetadata().sizeBytes()); assertEquals(Compression.UNCOMPRESSED, input.getCompression()); - assertTrue(input.isSeekable()); + assertTrue(input.getMetadata().isReadSeekEfficient()); try { assertEquals("Hello world", input.readFullyAsUTF8String()); } catch (IOException e) { @@ -286,10 +286,10 @@ public class FileIOTest implements Serializable { new SerializableFunction<FileIO.ReadableFile, Void>() { @Override public Void apply(FileIO.ReadableFile input) { - assertEquals(pathGZ, input.getResourceId().toString()); - assertFalse(input.getSizeBytes() == "Hello world".length()); + assertEquals(pathGZ, input.getMetadata().resourceId().toString()); + assertFalse(input.getMetadata().sizeBytes() == "Hello world".length()); assertEquals(Compression.GZIP, input.getCompression()); - assertFalse(input.isSeekable()); + assertFalse(input.getMetadata().isReadSeekEfficient()); try { assertEquals("Hello world", input.readFullyAsUTF8String()); } catch (IOException e) {