Introduces FileIO.read()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/910d02fb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/910d02fb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/910d02fb Branch: refs/heads/master Commit: 910d02fb464d8a91c7149f214419964cb834639b Parents: 80b9cf9 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Thu Aug 31 16:11:25 2017 -0700 Committer: Eugene Kirpichov <ekirpic...@gmail.com> Committed: Sun Sep 3 16:32:25 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/coders/CoderRegistry.java | 6 + .../java/org/apache/beam/sdk/io/FileIO.java | 181 ++++++++++- .../apache/beam/sdk/io/ReadableFileCoder.java | 61 ++++ .../java/org/apache/beam/sdk/io/FileIOTest.java | 313 +++++++++++++++++++ 4 files changed, 559 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/910d02fb/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java index c335bda..012d6de 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java @@ -43,6 +43,8 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.ReadableFileCoder; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MetadataCoder; import org.apache.beam.sdk.io.fs.ResourceId; @@ -119,6 +121,10 @@ public class CoderRegistry { CoderProviders.fromStaticMethods(Metadata.class, MetadataCoder.class)); builder.put(ResourceId.class, CoderProviders.fromStaticMethods(ResourceId.class, ResourceIdCoder.class)); + builder.put( + FileIO.ReadableFile.class, + CoderProviders.fromStaticMethods( + FileIO.ReadableFile.class, ReadableFileCoder.class)); builder.put(Set.class, CoderProviders.fromStaticMethods(Set.class, SetCoder.class)); builder.put(String.class, http://git-wip-us.apache.org/repos/asf/beam/blob/910d02fb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 1eb81df..fcae0f7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -17,13 +17,22 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + import com.google.auto.value.AutoValue; +import java.io.IOException; import java.io.Serializable; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.charset.StandardCharsets; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -34,6 +43,7 @@ import org.apache.beam.sdk.transforms.Watch; import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.util.StreamUtils; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; @@ -43,7 +53,7 @@ import org.slf4j.LoggerFactory; /** * Transforms for working with files. Currently includes matching of filepatterns via {@link #match} - * and {@link #matchAll}. + * and {@link #matchAll}, and reading matches via {@link #readMatches}. */ public class FileIO { private static final Logger LOG = LoggerFactory.getLogger(FileIO.class); @@ -83,6 +93,86 @@ public class FileIO { } /** + * Converts each result of {@link #match} or {@link #matchAll} to a {@link ReadableFile} which can + * be used to read the contents of each file, optionally decompressing it. + */ + public static ReadMatches readMatches() { + return new AutoValue_FileIO_ReadMatches.Builder() + .setCompression(Compression.AUTO) + .setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP) + .build(); + } + + /** A utility class for accessing a potentially compressed file. */ + public final static class ReadableFile { + private final ResourceId resourceId; + private final long sizeBytes; + private final boolean isSeekable; + private final Compression compression; + + ReadableFile( + ResourceId resourceId, + long sizeBytes, + boolean isSeekable, + Compression compression) { + this.resourceId = resourceId; + this.sizeBytes = sizeBytes; + this.isSeekable = isSeekable; + this.compression = compression; + } + + /** Returns the {@link ResourceId} of the file. */ + public ResourceId getResourceId() { + return resourceId; + } + + /** Returns the size of the file in bytes (before decompression). */ + public long getSizeBytes() { + return sizeBytes; + } + + /** + * Returns whether or not the channel returned by {@link #open} can be efficiently seeked. + * If true, then {@link #open} will return a {@link SeekableByteChannel}. + */ + public boolean isSeekable() { + return isSeekable; + } + + /** Returns the method with which this file will be decompressed in {@link #open}. */ + public Compression getCompression() { + return compression; + } + + /** + * Returns a {@link ReadableByteChannel} reading the data from this file, potentially + * decompressing it using {@link #getCompression}. + */ + public ReadableByteChannel open() throws IOException { + return compression.readDecompressed(FileSystems.open(resourceId)); + } + + /** + * Returns a {@link SeekableByteChannel} equivalent to {@link #open}, but fails if this file is + * not {@link #isSeekable() seekable}. + */ + public SeekableByteChannel openSeekable() throws IOException { + checkState(isSeekable(), "The file %s is not seekable", resourceId); + return ((SeekableByteChannel) open()); + } + + /** Returns the full contents of the file as bytes. */ + public byte[] readFullyAsBytes() throws IOException { + return StreamUtils.getBytes(Channels.newInputStream(open())); + } + + /** Returns the full contents of the file as a {@link String} decoded as UTF-8. */ + public String readFullyAsUTF8String() throws IOException { + return new String(readFullyAsBytes(), StandardCharsets.UTF_8); + } + } + + /** * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} * and continuous watching for matching files. */ @@ -138,7 +228,7 @@ public class FileIO { /** Implementation of {@link #match}. */ @AutoValue public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>> { - abstract ValueProvider<String> getFilepattern(); + @Nullable abstract ValueProvider<String> getFilepattern(); abstract MatchConfiguration getConfiguration(); abstract Builder toBuilder(); @@ -262,4 +352,91 @@ public class FileIO { } } } + + /** Implementation of {@link #readMatches}. */ + @AutoValue + public abstract static class ReadMatches + extends PTransform<PCollection<MatchResult.Metadata>, PCollection<ReadableFile>> { + enum DirectoryTreatment { + SKIP, + PROHIBIT + } + + abstract Compression getCompression(); + abstract DirectoryTreatment getDirectoryTreatment(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setCompression(Compression compression); + abstract Builder setDirectoryTreatment(DirectoryTreatment directoryTreatment); + + abstract ReadMatches build(); + } + + /** Reads files using the given {@link Compression}. Default is {@link Compression#AUTO}. */ + public ReadMatches withCompression(Compression compression) { + checkArgument(compression != null, "compression can not be null"); + return toBuilder().setCompression(compression).build(); + } + + /** + * Controls how to handle directories in the input {@link PCollection}. Default is {@link + * DirectoryTreatment#SKIP}. + */ + public ReadMatches withDirectoryTreatment(DirectoryTreatment directoryTreatment) { + checkArgument(directoryTreatment != null, "directoryTreatment can not be null"); + return toBuilder().setDirectoryTreatment(directoryTreatment).build(); + } + + @Override + public PCollection<ReadableFile> expand(PCollection<MatchResult.Metadata> input) { + return input.apply(ParDo.of(new ToReadableFileFn(this))); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("compression", getCompression().toString())); + builder.add(DisplayData.item("directoryTreatment", getDirectoryTreatment().toString())); + } + + private static class ToReadableFileFn extends DoFn<MatchResult.Metadata, ReadableFile> { + private final ReadMatches spec; + + private ToReadableFileFn(ReadMatches spec) { + this.spec = spec; + } + + @ProcessElement + public void process(ProcessContext c) { + MatchResult.Metadata metadata = c.element(); + if (metadata.resourceId().isDirectory()) { + switch(spec.getDirectoryTreatment()) { + case SKIP: + return; + + case PROHIBIT: + throw new IllegalArgumentException( + "Trying to read " + metadata.resourceId() + " which is a directory"); + + default: + throw new UnsupportedOperationException( + "Unknown DirectoryTreatment: " + spec.getDirectoryTreatment()); + } + } + + Compression compression = + (spec.getCompression() == Compression.AUTO) + ? Compression.detect(metadata.resourceId().getFilename()) + : spec.getCompression(); + c.output( + new ReadableFile( + metadata.resourceId(), + metadata.sizeBytes(), + metadata.isReadSeekEfficient() && compression == Compression.UNCOMPRESSED, + compression)); + } + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/910d02fb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java new file mode 100644 index 0000000..4ef069c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.fs.ResourceIdCoder; + +/** A {@link Coder} for {@link FileIO.ReadableFile}. */ +public class ReadableFileCoder extends AtomicCoder<FileIO.ReadableFile> { + private static final ReadableFileCoder INSTANCE = new ReadableFileCoder(); + + private static final BooleanCoder IS_SEEKABLE_CODER = BooleanCoder.of(); + private static final VarIntCoder COMPRESSION_CODER = VarIntCoder.of(); + private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of(); + private static final VarLongCoder SIZE_CODER = VarLongCoder.of(); + + /** Returns the instance of {@link ReadableFileCoder}. */ + public static ReadableFileCoder of() { + return INSTANCE; + } + + @Override + public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException { + RESOURCE_ID_CODER.encode(value.getResourceId(), os); + SIZE_CODER.encode(value.getSizeBytes(), os); + IS_SEEKABLE_CODER.encode(value.isSeekable(), os); + COMPRESSION_CODER.encode(value.getCompression().ordinal(), os); + } + + @Override + public FileIO.ReadableFile decode(InputStream is) throws IOException { + ResourceId resourceId = RESOURCE_ID_CODER.decode(is); + long sizeBytes = SIZE_CODER.decode(is); + boolean isSeekable = IS_SEEKABLE_CODER.decode(is); + Compression compression = Compression.values()[COMPRESSION_CODER.decode(is)]; + return new FileIO.ReadableFile(resourceId, sizeBytes, isSeekable, compression); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/910d02fb/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java new file mode 100644 index 0000000..341d86a --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.io.Writer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.zip.GZIPOutputStream; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesSplittableParDo; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Watch; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link FileIO}. */ +@RunWith(JUnit4.class) +public class FileIOTest implements Serializable { + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + @Test + @Category(NeedsRunner.class) + public void testMatchAndMatchAll() throws IOException { + Path firstPath = tmpFolder.newFile("first").toPath(); + Path secondPath = tmpFolder.newFile("second").toPath(); + int firstSize = 37; + int secondSize = 42; + Files.write(firstPath, new byte[firstSize]); + Files.write(secondPath, new byte[secondSize]); + + PAssert.that( + p.apply( + "Match existing", + FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*"))) + .containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize)); + PAssert.that( + p.apply( + "Match existing with provider", + FileIO.match() + .filepattern(p.newProvider(tmpFolder.getRoot().getAbsolutePath() + "/*")))) + .containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize)); + PAssert.that( + p.apply("Create existing", Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*")) + .apply("MatchAll existing", FileIO.matchAll())) + .containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize)); + + PAssert.that( + p.apply( + "Match non-existing ALLOW", + FileIO.match() + .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah") + .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))) + .containsInAnyOrder(); + PAssert.that( + p.apply( + "Create non-existing", + Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah")) + .apply( + "MatchAll non-existing ALLOW", + FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))) + .containsInAnyOrder(); + + PAssert.that( + p.apply( + "Match non-existing ALLOW_IF_WILDCARD", + FileIO.match() + .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah*") + .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD))) + .containsInAnyOrder(); + PAssert.that( + p.apply( + "Create non-existing wildcard + explicit", + Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah*")) + .apply( + "MatchAll non-existing ALLOW_IF_WILDCARD", + FileIO.matchAll() + .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD))) + .containsInAnyOrder(); + PAssert.that( + p.apply( + "Create non-existing wildcard + default", + Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah*")) + .apply("MatchAll non-existing default", FileIO.matchAll())) + .containsInAnyOrder(); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMatchDisallowEmptyDefault() throws IOException { + p.apply("Match", FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*")); + + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMatchDisallowEmptyExplicit() throws IOException { + p.apply( + FileIO.match() + .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*") + .withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); + + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMatchDisallowEmptyNonWildcard() throws IOException { + p.apply( + FileIO.match() + .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah") + .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)); + + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMatchAllDisallowEmptyExplicit() throws IOException { + p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*")) + .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMatchAllDisallowEmptyNonWildcard() throws IOException { + p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah")) + .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)); + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } + + @Test + @Category({NeedsRunner.class, UsesSplittableParDo.class}) + public void testMatchWatchForNewFiles() throws IOException, InterruptedException { + final Path basePath = tmpFolder.getRoot().toPath().resolve("watch"); + basePath.toFile().mkdir(); + PCollection<MatchResult.Metadata> matchMetadata = + p.apply( + FileIO.match() + .filepattern(basePath.resolve("*").toString()) + .continuously( + Duration.millis(100), + Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))); + PCollection<MatchResult.Metadata> matchAllMetadata = + p.apply(Create.of(basePath.resolve("*").toString())) + .apply( + FileIO.matchAll() + .continuously( + Duration.millis(100), + Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))); + + Thread writer = + new Thread() { + @Override + public void run() { + try { + Thread.sleep(1000); + Files.write(basePath.resolve("first"), new byte[42]); + Thread.sleep(300); + Files.write(basePath.resolve("second"), new byte[37]); + Thread.sleep(300); + Files.write(basePath.resolve("third"), new byte[99]); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + }; + writer.start(); + + List<MatchResult.Metadata> expected = + Arrays.asList( + metadata(basePath.resolve("first"), 42), + metadata(basePath.resolve("second"), 37), + metadata(basePath.resolve("third"), 99)); + PAssert.that(matchMetadata).containsInAnyOrder(expected); + PAssert.that(matchAllMetadata).containsInAnyOrder(expected); + p.run(); + + writer.join(); + } + + @Test + @Category(NeedsRunner.class) + public void testRead() throws IOException { + final String path = tmpFolder.newFile("file").getAbsolutePath(); + final String pathGZ = tmpFolder.newFile("file.gz").getAbsolutePath(); + Files.write(new File(path).toPath(), "Hello world".getBytes()); + try (Writer writer = + new OutputStreamWriter(new GZIPOutputStream(new FileOutputStream(pathGZ)))) { + writer.write("Hello world"); + } + + PCollection<MatchResult.Metadata> matches = p.apply("Match", FileIO.match().filepattern(path)); + PCollection<FileIO.ReadableFile> decompressedAuto = + matches.apply("Read AUTO", FileIO.readMatches().withCompression(Compression.AUTO)); + PCollection<FileIO.ReadableFile> decompressedDefault = + matches.apply("Read default", FileIO.readMatches()); + PCollection<FileIO.ReadableFile> decompressedUncompressed = + matches.apply( + "Read UNCOMPRESSED", FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)); + for (PCollection<FileIO.ReadableFile> c : + Arrays.asList(decompressedAuto, decompressedDefault, decompressedUncompressed)) { + PAssert.thatSingleton(c) + .satisfies( + new SerializableFunction<FileIO.ReadableFile, Void>() { + @Override + public Void apply(FileIO.ReadableFile input) { + assertEquals(path, input.getResourceId().toString()); + assertEquals("Hello world".length(), input.getSizeBytes()); + assertEquals(Compression.UNCOMPRESSED, input.getCompression()); + assertTrue(input.isSeekable()); + try { + assertEquals("Hello world", input.readFullyAsUTF8String()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + } + }); + } + + PCollection<MatchResult.Metadata> matchesGZ = + p.apply("Match GZ", FileIO.match().filepattern(pathGZ)); + PCollection<FileIO.ReadableFile> compressionAuto = + matchesGZ.apply("Read GZ AUTO", FileIO.readMatches().withCompression(Compression.AUTO)); + PCollection<FileIO.ReadableFile> compressionDefault = + matchesGZ.apply("Read GZ default", FileIO.readMatches()); + PCollection<FileIO.ReadableFile> compressionGzip = + matchesGZ.apply("Read GZ GZIP", FileIO.readMatches().withCompression(Compression.GZIP)); + for (PCollection<FileIO.ReadableFile> c : + Arrays.asList(compressionAuto, compressionDefault, compressionGzip)) { + PAssert.thatSingleton(c) + .satisfies( + new SerializableFunction<FileIO.ReadableFile, Void>() { + @Override + public Void apply(FileIO.ReadableFile input) { + assertEquals(pathGZ, input.getResourceId().toString()); + assertFalse(input.getSizeBytes() == "Hello world".length()); + assertEquals(Compression.GZIP, input.getCompression()); + assertFalse(input.isSeekable()); + try { + assertEquals("Hello world", input.readFullyAsUTF8String()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + } + }); + } + + p.run(); + } + + private static MatchResult.Metadata metadata(Path path, int size) { + return MatchResult.Metadata.builder() + .setResourceId(FileSystems.matchNewResource(path.toString(), false /* isDirectory */)) + .setIsReadSeekEfficient(true) + .setSizeBytes(size) + .build(); + } +}