This is an automated email from the ASF dual-hosted git repository. austin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new aaf6871c7c3 Add filename-retaining ReadAllViaFileBasedSource (#26044) aaf6871c7c3 is described below commit aaf6871c7c335addec32bffa3b6841f9477dba56 Author: kellen <kel...@users.noreply.github.com> AuthorDate: Wed Apr 12 23:44:33 2023 -0400 Add filename-retaining ReadAllViaFileBasedSource (#26044) --- .../beam/sdk/io/ReadAllViaFileBasedSource.java | 100 ++++----------------- ...ava => ReadAllViaFileBasedSourceTransform.java} | 99 +++++++++----------- .../io/ReadAllViaFileBasedSourceWithFilename.java | 72 +++++++++++++++ .../java/org/apache/beam/sdk/io/TextSource.java | 6 +- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 43 +++++++++ .../org/apache/beam/sdk/io/TextIOReadTest.java | 47 ++++++++++ 6 files changed, 223 insertions(+), 144 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java index 35819b60ebf..851fc92838f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java @@ -17,20 +17,14 @@ */ package org.apache.beam.sdk.io; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.io.IOException; import java.io.Serializable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.FileIO.ReadableFile; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -44,21 +38,12 @@ import org.apache.beam.sdk.values.PCollection; * FileIO#readMatches()}. */ @Experimental(Kind.SOURCE_SINK) -public class ReadAllViaFileBasedSource<T> - extends PTransform<PCollection<ReadableFile>, PCollection<T>> { - - public static final boolean DEFAULT_USES_RESHUFFLE = true; - private final long desiredBundleSizeBytes; - private final SerializableFunction<String, ? extends FileBasedSource<T>> createSource; - private final Coder<T> coder; - private final ReadFileRangesFnExceptionHandler exceptionHandler; - private final boolean usesReshuffle; - +public class ReadAllViaFileBasedSource<T> extends ReadAllViaFileBasedSourceTransform<T, T> { public ReadAllViaFileBasedSource( long desiredBundleSizeBytes, SerializableFunction<String, ? extends FileBasedSource<T>> createSource, Coder<T> coder) { - this( + super( desiredBundleSizeBytes, createSource, coder, @@ -72,79 +57,28 @@ public class ReadAllViaFileBasedSource<T> Coder<T> coder, boolean usesReshuffle, ReadFileRangesFnExceptionHandler exceptionHandler) { - this.desiredBundleSizeBytes = desiredBundleSizeBytes; - this.createSource = createSource; - this.coder = coder; - this.usesReshuffle = usesReshuffle; - this.exceptionHandler = exceptionHandler; + super(desiredBundleSizeBytes, createSource, coder, usesReshuffle, exceptionHandler); } @Override - public PCollection<T> expand(PCollection<ReadableFile> input) { - PCollection<KV<ReadableFile, OffsetRange>> ranges = - input.apply("Split into ranges", ParDo.of(new SplitIntoRangesFn(desiredBundleSizeBytes))); - if (usesReshuffle) { - ranges = ranges.apply("Reshuffle", Reshuffle.viaRandomKey()); - } - return ranges - .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource, exceptionHandler))) - .setCoder(coder); - } - - private static class SplitIntoRangesFn extends DoFn<ReadableFile, KV<ReadableFile, OffsetRange>> { - private final long desiredBundleSizeBytes; - - private SplitIntoRangesFn(long desiredBundleSizeBytes) { - this.desiredBundleSizeBytes = desiredBundleSizeBytes; - } - - @ProcessElement - public void process(ProcessContext c) { - Metadata metadata = c.element().getMetadata(); - if (!metadata.isReadSeekEfficient()) { - c.output(KV.of(c.element(), new OffsetRange(0, metadata.sizeBytes()))); - return; - } - for (OffsetRange range : - new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSizeBytes, 0)) { - c.output(KV.of(c.element(), range)); - } - } + protected DoFn<KV<ReadableFile, OffsetRange>, T> readRangesFn() { + return new ReadFileRangesFn<>(createSource, exceptionHandler); } - private static class ReadFileRangesFn<T> extends DoFn<KV<ReadableFile, OffsetRange>, T> { - private final SerializableFunction<String, ? extends FileBasedSource<T>> createSource; - private final ReadFileRangesFnExceptionHandler exceptionHandler; - - private ReadFileRangesFn( - SerializableFunction<String, ? extends FileBasedSource<T>> createSource, - ReadFileRangesFnExceptionHandler exceptionHandler) { - this.createSource = createSource; - this.exceptionHandler = exceptionHandler; + private static class ReadFileRangesFn<T> extends AbstractReadFileRangesFn<T, T> { + public ReadFileRangesFn( + final SerializableFunction<String, ? extends FileBasedSource<T>> createSource, + final ReadFileRangesFnExceptionHandler exceptionHandler) { + super(createSource, exceptionHandler); } - @ProcessElement - @SuppressFBWarnings( - value = "RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE", - justification = "https://github.com/spotbugs/spotbugs/issues/756") - public void process(ProcessContext c) throws IOException { - ReadableFile file = c.element().getKey(); - OffsetRange range = c.element().getValue(); - FileBasedSource<T> source = - CompressedSource.from(createSource.apply(file.getMetadata().resourceId().toString())) - .withCompression(file.getCompression()); - try (BoundedSource.BoundedReader<T> reader = - source - .createForSubrangeOfFile(file.getMetadata(), range.getFrom(), range.getTo()) - .createReader(c.getPipelineOptions())) { - for (boolean more = reader.start(); more; more = reader.advance()) { - c.output(reader.getCurrent()); - } - } catch (RuntimeException e) { - if (exceptionHandler.apply(file, range, e)) { - throw e; - } - } + @Override + protected T makeOutput( + final ReadableFile file, + final OffsetRange range, + final FileBasedSource<T> fileBasedSource, + final BoundedSource.BoundedReader<T> reader) { + return reader.getCurrent(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java similarity index 56% copy from sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java copy to sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java index 35819b60ebf..b5fa2f10e21 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java @@ -19,13 +19,9 @@ package org.apache.beam.sdk.io; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; -import java.io.Serializable; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.FileIO.ReadableFile; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -35,43 +31,34 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -/** - * Reads each file in the input {@link PCollection} of {@link ReadableFile} using given parameters - * for splitting files into offset ranges and for creating a {@link FileBasedSource} for a file. The - * input {@link PCollection} must not contain {@link ResourceId#isDirectory directories}. - * - * <p>To obtain the collection of {@link ReadableFile} from a filepattern, use {@link - * FileIO#readMatches()}. - */ -@Experimental(Kind.SOURCE_SINK) -public class ReadAllViaFileBasedSource<T> - extends PTransform<PCollection<ReadableFile>, PCollection<T>> { - +@Experimental(Experimental.Kind.SOURCE_SINK) +public abstract class ReadAllViaFileBasedSourceTransform<InT, T> + extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> { public static final boolean DEFAULT_USES_RESHUFFLE = true; - private final long desiredBundleSizeBytes; - private final SerializableFunction<String, ? extends FileBasedSource<T>> createSource; - private final Coder<T> coder; - private final ReadFileRangesFnExceptionHandler exceptionHandler; - private final boolean usesReshuffle; + protected final long desiredBundleSizeBytes; + protected final SerializableFunction<String, ? extends FileBasedSource<InT>> createSource; + protected final Coder<T> coder; + protected final ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler exceptionHandler; + protected final boolean usesReshuffle; - public ReadAllViaFileBasedSource( + public ReadAllViaFileBasedSourceTransform( long desiredBundleSizeBytes, - SerializableFunction<String, ? extends FileBasedSource<T>> createSource, + SerializableFunction<String, ? extends FileBasedSource<InT>> createSource, Coder<T> coder) { this( desiredBundleSizeBytes, createSource, coder, DEFAULT_USES_RESHUFFLE, - new ReadFileRangesFnExceptionHandler()); + new ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler()); } - public ReadAllViaFileBasedSource( + public ReadAllViaFileBasedSourceTransform( long desiredBundleSizeBytes, - SerializableFunction<String, ? extends FileBasedSource<T>> createSource, + SerializableFunction<String, ? extends FileBasedSource<InT>> createSource, Coder<T> coder, boolean usesReshuffle, - ReadFileRangesFnExceptionHandler exceptionHandler) { + ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler exceptionHandler) { this.desiredBundleSizeBytes = desiredBundleSizeBytes; this.createSource = createSource; this.coder = coder; @@ -80,27 +67,28 @@ public class ReadAllViaFileBasedSource<T> } @Override - public PCollection<T> expand(PCollection<ReadableFile> input) { - PCollection<KV<ReadableFile, OffsetRange>> ranges = + public PCollection<T> expand(PCollection<FileIO.ReadableFile> input) { + PCollection<KV<FileIO.ReadableFile, OffsetRange>> ranges = input.apply("Split into ranges", ParDo.of(new SplitIntoRangesFn(desiredBundleSizeBytes))); if (usesReshuffle) { ranges = ranges.apply("Reshuffle", Reshuffle.viaRandomKey()); } - return ranges - .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource, exceptionHandler))) - .setCoder(coder); + return ranges.apply("Read ranges", ParDo.of(readRangesFn())).setCoder(coder); } - private static class SplitIntoRangesFn extends DoFn<ReadableFile, KV<ReadableFile, OffsetRange>> { + protected abstract DoFn<KV<FileIO.ReadableFile, OffsetRange>, T> readRangesFn(); + + public static class SplitIntoRangesFn + extends DoFn<FileIO.ReadableFile, KV<FileIO.ReadableFile, OffsetRange>> { private final long desiredBundleSizeBytes; - private SplitIntoRangesFn(long desiredBundleSizeBytes) { + public SplitIntoRangesFn(long desiredBundleSizeBytes) { this.desiredBundleSizeBytes = desiredBundleSizeBytes; } @ProcessElement public void process(ProcessContext c) { - Metadata metadata = c.element().getMetadata(); + MatchResult.Metadata metadata = c.element().getMetadata(); if (!metadata.isReadSeekEfficient()) { c.output(KV.of(c.element(), new OffsetRange(0, metadata.sizeBytes()))); return; @@ -112,33 +100,40 @@ public class ReadAllViaFileBasedSource<T> } } - private static class ReadFileRangesFn<T> extends DoFn<KV<ReadableFile, OffsetRange>, T> { - private final SerializableFunction<String, ? extends FileBasedSource<T>> createSource; - private final ReadFileRangesFnExceptionHandler exceptionHandler; + public abstract static class AbstractReadFileRangesFn<InT, T> + extends DoFn<KV<FileIO.ReadableFile, OffsetRange>, T> { + private final SerializableFunction<String, ? extends FileBasedSource<InT>> createSource; + private final ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler exceptionHandler; - private ReadFileRangesFn( - SerializableFunction<String, ? extends FileBasedSource<T>> createSource, - ReadFileRangesFnExceptionHandler exceptionHandler) { + public AbstractReadFileRangesFn( + SerializableFunction<String, ? extends FileBasedSource<InT>> createSource, + ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler exceptionHandler) { this.createSource = createSource; this.exceptionHandler = exceptionHandler; } + protected abstract T makeOutput( + FileIO.ReadableFile file, + OffsetRange range, + FileBasedSource<InT> fileBasedSource, + BoundedSource.BoundedReader<InT> reader); + @ProcessElement @SuppressFBWarnings( value = "RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE", justification = "https://github.com/spotbugs/spotbugs/issues/756") public void process(ProcessContext c) throws IOException { - ReadableFile file = c.element().getKey(); + FileIO.ReadableFile file = c.element().getKey(); OffsetRange range = c.element().getValue(); - FileBasedSource<T> source = + FileBasedSource<InT> source = CompressedSource.from(createSource.apply(file.getMetadata().resourceId().toString())) .withCompression(file.getCompression()); - try (BoundedSource.BoundedReader<T> reader = + try (BoundedSource.BoundedReader<InT> reader = source .createForSubrangeOfFile(file.getMetadata(), range.getFrom(), range.getTo()) .createReader(c.getPipelineOptions())) { for (boolean more = reader.start(); more; more = reader.advance()) { - c.output(reader.getCurrent()); + c.output(makeOutput(file, range, source, reader)); } } catch (RuntimeException e) { if (exceptionHandler.apply(file, range, e)) { @@ -147,16 +142,4 @@ public class ReadAllViaFileBasedSource<T> } } } - - /** A class to handle errors which occur during file reads. */ - public static class ReadFileRangesFnExceptionHandler implements Serializable { - - /* - * Applies the desired handler logic to the given exception and returns - * if the exception should be thrown. - */ - public boolean apply(ReadableFile file, OffsetRange range, Exception e) { - return true; - } - } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceWithFilename.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceWithFilename.java new file mode 100644 index 00000000000..9190c885953 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceWithFilename.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * Reads each file of the input {@link PCollection} and outputs each element as the value of a + * {@link KV}, where the key is the filename from which that value came. + * + * <p>Reads each {@link FileIO.ReadableFile} using given parameters for splitting files into offset + * ranges and for creating a {@link FileBasedSource} for a file. The input {@link PCollection} must + * not contain {@link ResourceId#isDirectory directories}. + * + * <p>To obtain the collection of {@link FileIO.ReadableFile} from a filepattern, use {@link + * FileIO#readMatches()}. + */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public class ReadAllViaFileBasedSourceWithFilename<T> + extends ReadAllViaFileBasedSourceTransform<T, KV<String, T>> { + + public ReadAllViaFileBasedSourceWithFilename( + final long desiredBundleSizeBytes, + final SerializableFunction<String, ? extends FileBasedSource<T>> createSource, + final Coder<KV<String, T>> coder) { + super(desiredBundleSizeBytes, createSource, coder); + } + + @Override + protected DoFn<KV<FileIO.ReadableFile, OffsetRange>, KV<String, T>> readRangesFn() { + return new ReadFileRangesFn<>(createSource, exceptionHandler); + } + + private static class ReadFileRangesFn<T> extends AbstractReadFileRangesFn<T, KV<String, T>> { + public ReadFileRangesFn( + final SerializableFunction<String, ? extends FileBasedSource<T>> createSource, + final ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler exceptionHandler) { + super(createSource, exceptionHandler); + } + + @Override + protected KV<String, T> makeOutput( + final FileIO.ReadableFile file, + final OffsetRange range, + final FileBasedSource<T> fileBasedSource, + final BoundedSource.BoundedReader<T> reader) { + return KV.of(file.getMetadata().resourceId().toString(), reader.getCurrent()); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index c800f50e49d..1b878d7ad50 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -53,16 +53,16 @@ import org.checkerframework.checker.nullness.qual.Nullable; @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) -class TextSource extends FileBasedSource<String> { +public class TextSource extends FileBasedSource<String> { byte[] delimiter; - TextSource( + public TextSource( ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) { super(fileSpec, emptyMatchTreatment, 1L); this.delimiter = delimiter; } - private TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) { + public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) { super(metadata, 1L, start, end); this.delimiter = delimiter; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 4c2812efe0d..e26d422ad8e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Random; +import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.file.CodecFactory; @@ -64,10 +65,12 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -91,6 +94,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; @@ -319,6 +323,45 @@ public class AvroIOTest implements Serializable { readPipeline.run(); } + @Test + @Category(NeedsRunner.class) + public void testReadWithFilename() throws Throwable { + List<GenericClass> values = + ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); + File outputFile = tmpFolder.newFile("output.avro"); + + writePipeline + .apply(Create.of(values)) + .apply( + AvroIO.write(GenericClass.class) + .to(writePipeline.newProvider(outputFile.getAbsolutePath())) + .withoutSharding()); + writePipeline.run(); + + SerializableFunction<String, ? extends FileBasedSource<GenericClass>> createSource = + input -> + AvroSource.from(ValueProvider.StaticValueProvider.of(input)) + .withSchema(GenericClass.class); + + final PCollection<KV<String, GenericClass>> lines = + readPipeline + .apply(Create.of(Collections.singletonList(outputFile.getAbsolutePath()))) + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches().withCompression(AUTO)) + .apply( + new ReadAllViaFileBasedSourceWithFilename<>( + 10, + createSource, + KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(GenericClass.class)))); + + PAssert.that(lines) + .containsInAnyOrder( + values.stream() + .map(v -> KV.of(outputFile.getAbsolutePath(), v)) + .collect(Collectors.toList())); + readPipeline.run(); + } + @Test @Category(NeedsRunner.class) public void testWriteThenReadCustomType() throws Throwable { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 414114ff42a..efae0ba77fa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -57,11 +57,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; @@ -78,6 +80,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.ToString; import org.apache.beam.sdk.transforms.Watch; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -86,6 +89,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; @@ -953,6 +957,49 @@ public class TextIOReadTest { p.run(); } + private List<KV<String, String>> filenameKV(Path path, String fn, List<String> input) { + return input.stream() + .map(l -> KV.of(path.resolve(fn).toString(), l)) + .collect(Collectors.toList()); + } + + @Test + @Category(NeedsRunner.class) + public void testReadFilesWithFilename() throws IOException { + Path tempFolderPath = tempFolder.getRoot().toPath(); + writeToFile(TINY, tempFolder, "readAllTiny1.zip", ZIP); + writeToFile(TINY, tempFolder, "readAllTiny2.txt", UNCOMPRESSED); + writeToFile(LARGE, tempFolder, "readAllLarge1.zip", ZIP); + writeToFile(LARGE, tempFolder, "readAllLarge2.txt", UNCOMPRESSED); + + SerializableFunction<String, ? extends FileBasedSource<String>> createSource = + input -> + new TextSource( + ValueProvider.StaticValueProvider.of(input), + EmptyMatchTreatment.DISALLOW, + new byte[] {'\n'}); + + PCollection<KV<String, String>> lines = + p.apply( + Create.of( + tempFolderPath.resolve("readAllTiny*").toString(), + tempFolderPath.resolve("readAllLarge*").toString())) + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches().withCompression(AUTO)) + .apply( + new ReadAllViaFileBasedSourceWithFilename<>( + 10, createSource, KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))); + + PAssert.that(lines) + .containsInAnyOrder( + Iterables.concat( + filenameKV(tempFolderPath, "readAllTiny1.zip", TINY), + filenameKV(tempFolderPath, "readAllTiny2.txt", TINY), + filenameKV(tempFolderPath, "readAllLarge1.zip", LARGE), + filenameKV(tempFolderPath, "readAllLarge2.txt", LARGE))); + p.run(); + } + @Test @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class}) public void testReadWatchForNewFiles() throws IOException, InterruptedException {