This is an automated email from the ASF dual-hosted git repository.

austin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aaf6871c7c3 Add filename-retaining ReadAllViaFileBasedSource (#26044)
aaf6871c7c3 is described below

commit aaf6871c7c335addec32bffa3b6841f9477dba56
Author: kellen <kel...@users.noreply.github.com>
AuthorDate: Wed Apr 12 23:44:33 2023 -0400

    Add filename-retaining ReadAllViaFileBasedSource (#26044)
---
 .../beam/sdk/io/ReadAllViaFileBasedSource.java     | 100 ++++-----------------
 ...ava => ReadAllViaFileBasedSourceTransform.java} |  99 +++++++++-----------
 .../io/ReadAllViaFileBasedSourceWithFilename.java  |  72 +++++++++++++++
 .../java/org/apache/beam/sdk/io/TextSource.java    |   6 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java    |  43 +++++++++
 .../org/apache/beam/sdk/io/TextIOReadTest.java     |  47 ++++++++++
 6 files changed, 223 insertions(+), 144 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
index 35819b60ebf..851fc92838f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -17,20 +17,14 @@
  */
 package org.apache.beam.sdk.io;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.io.IOException;
 import java.io.Serializable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.FileIO.ReadableFile;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -44,21 +38,12 @@ import org.apache.beam.sdk.values.PCollection;
  * FileIO#readMatches()}.
  */
 @Experimental(Kind.SOURCE_SINK)
-public class ReadAllViaFileBasedSource<T>
-    extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
-
-  public static final boolean DEFAULT_USES_RESHUFFLE = true;
-  private final long desiredBundleSizeBytes;
-  private final SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource;
-  private final Coder<T> coder;
-  private final ReadFileRangesFnExceptionHandler exceptionHandler;
-  private final boolean usesReshuffle;
-
+public class ReadAllViaFileBasedSource<T> extends 
ReadAllViaFileBasedSourceTransform<T, T> {
   public ReadAllViaFileBasedSource(
       long desiredBundleSizeBytes,
       SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
       Coder<T> coder) {
-    this(
+    super(
         desiredBundleSizeBytes,
         createSource,
         coder,
@@ -72,79 +57,28 @@ public class ReadAllViaFileBasedSource<T>
       Coder<T> coder,
       boolean usesReshuffle,
       ReadFileRangesFnExceptionHandler exceptionHandler) {
-    this.desiredBundleSizeBytes = desiredBundleSizeBytes;
-    this.createSource = createSource;
-    this.coder = coder;
-    this.usesReshuffle = usesReshuffle;
-    this.exceptionHandler = exceptionHandler;
+    super(desiredBundleSizeBytes, createSource, coder, usesReshuffle, 
exceptionHandler);
   }
 
   @Override
-  public PCollection<T> expand(PCollection<ReadableFile> input) {
-    PCollection<KV<ReadableFile, OffsetRange>> ranges =
-        input.apply("Split into ranges", ParDo.of(new 
SplitIntoRangesFn(desiredBundleSizeBytes)));
-    if (usesReshuffle) {
-      ranges = ranges.apply("Reshuffle", Reshuffle.viaRandomKey());
-    }
-    return ranges
-        .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource, 
exceptionHandler)))
-        .setCoder(coder);
-  }
-
-  private static class SplitIntoRangesFn extends DoFn<ReadableFile, 
KV<ReadableFile, OffsetRange>> {
-    private final long desiredBundleSizeBytes;
-
-    private SplitIntoRangesFn(long desiredBundleSizeBytes) {
-      this.desiredBundleSizeBytes = desiredBundleSizeBytes;
-    }
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      Metadata metadata = c.element().getMetadata();
-      if (!metadata.isReadSeekEfficient()) {
-        c.output(KV.of(c.element(), new OffsetRange(0, metadata.sizeBytes())));
-        return;
-      }
-      for (OffsetRange range :
-          new OffsetRange(0, 
metadata.sizeBytes()).split(desiredBundleSizeBytes, 0)) {
-        c.output(KV.of(c.element(), range));
-      }
-    }
+  protected DoFn<KV<ReadableFile, OffsetRange>, T> readRangesFn() {
+    return new ReadFileRangesFn<>(createSource, exceptionHandler);
   }
 
-  private static class ReadFileRangesFn<T> extends DoFn<KV<ReadableFile, 
OffsetRange>, T> {
-    private final SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource;
-    private final ReadFileRangesFnExceptionHandler exceptionHandler;
-
-    private ReadFileRangesFn(
-        SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource,
-        ReadFileRangesFnExceptionHandler exceptionHandler) {
-      this.createSource = createSource;
-      this.exceptionHandler = exceptionHandler;
+  private static class ReadFileRangesFn<T> extends AbstractReadFileRangesFn<T, 
T> {
+    public ReadFileRangesFn(
+        final SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource,
+        final ReadFileRangesFnExceptionHandler exceptionHandler) {
+      super(createSource, exceptionHandler);
     }
 
-    @ProcessElement
-    @SuppressFBWarnings(
-        value = "RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE",
-        justification = "https://github.com/spotbugs/spotbugs/issues/756";)
-    public void process(ProcessContext c) throws IOException {
-      ReadableFile file = c.element().getKey();
-      OffsetRange range = c.element().getValue();
-      FileBasedSource<T> source =
-          
CompressedSource.from(createSource.apply(file.getMetadata().resourceId().toString()))
-              .withCompression(file.getCompression());
-      try (BoundedSource.BoundedReader<T> reader =
-          source
-              .createForSubrangeOfFile(file.getMetadata(), range.getFrom(), 
range.getTo())
-              .createReader(c.getPipelineOptions())) {
-        for (boolean more = reader.start(); more; more = reader.advance()) {
-          c.output(reader.getCurrent());
-        }
-      } catch (RuntimeException e) {
-        if (exceptionHandler.apply(file, range, e)) {
-          throw e;
-        }
-      }
+    @Override
+    protected T makeOutput(
+        final ReadableFile file,
+        final OffsetRange range,
+        final FileBasedSource<T> fileBasedSource,
+        final BoundedSource.BoundedReader<T> reader) {
+      return reader.getCurrent();
     }
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
similarity index 56%
copy from 
sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
copy to 
sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
index 35819b60ebf..b5fa2f10e21 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
@@ -19,13 +19,9 @@ package org.apache.beam.sdk.io;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
-import java.io.Serializable;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.FileIO.ReadableFile;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -35,43 +31,34 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
-/**
- * Reads each file in the input {@link PCollection} of {@link ReadableFile} 
using given parameters
- * for splitting files into offset ranges and for creating a {@link 
FileBasedSource} for a file. The
- * input {@link PCollection} must not contain {@link ResourceId#isDirectory 
directories}.
- *
- * <p>To obtain the collection of {@link ReadableFile} from a filepattern, use 
{@link
- * FileIO#readMatches()}.
- */
-@Experimental(Kind.SOURCE_SINK)
-public class ReadAllViaFileBasedSource<T>
-    extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
-
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public abstract class ReadAllViaFileBasedSourceTransform<InT, T>
+    extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
   public static final boolean DEFAULT_USES_RESHUFFLE = true;
-  private final long desiredBundleSizeBytes;
-  private final SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource;
-  private final Coder<T> coder;
-  private final ReadFileRangesFnExceptionHandler exceptionHandler;
-  private final boolean usesReshuffle;
+  protected final long desiredBundleSizeBytes;
+  protected final SerializableFunction<String, ? extends FileBasedSource<InT>> 
createSource;
+  protected final Coder<T> coder;
+  protected final ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler 
exceptionHandler;
+  protected final boolean usesReshuffle;
 
-  public ReadAllViaFileBasedSource(
+  public ReadAllViaFileBasedSourceTransform(
       long desiredBundleSizeBytes,
-      SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
+      SerializableFunction<String, ? extends FileBasedSource<InT>> 
createSource,
       Coder<T> coder) {
     this(
         desiredBundleSizeBytes,
         createSource,
         coder,
         DEFAULT_USES_RESHUFFLE,
-        new ReadFileRangesFnExceptionHandler());
+        new ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler());
   }
 
-  public ReadAllViaFileBasedSource(
+  public ReadAllViaFileBasedSourceTransform(
       long desiredBundleSizeBytes,
-      SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
+      SerializableFunction<String, ? extends FileBasedSource<InT>> 
createSource,
       Coder<T> coder,
       boolean usesReshuffle,
-      ReadFileRangesFnExceptionHandler exceptionHandler) {
+      ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler 
exceptionHandler) {
     this.desiredBundleSizeBytes = desiredBundleSizeBytes;
     this.createSource = createSource;
     this.coder = coder;
@@ -80,27 +67,28 @@ public class ReadAllViaFileBasedSource<T>
   }
 
   @Override
-  public PCollection<T> expand(PCollection<ReadableFile> input) {
-    PCollection<KV<ReadableFile, OffsetRange>> ranges =
+  public PCollection<T> expand(PCollection<FileIO.ReadableFile> input) {
+    PCollection<KV<FileIO.ReadableFile, OffsetRange>> ranges =
         input.apply("Split into ranges", ParDo.of(new 
SplitIntoRangesFn(desiredBundleSizeBytes)));
     if (usesReshuffle) {
       ranges = ranges.apply("Reshuffle", Reshuffle.viaRandomKey());
     }
-    return ranges
-        .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource, 
exceptionHandler)))
-        .setCoder(coder);
+    return ranges.apply("Read ranges", 
ParDo.of(readRangesFn())).setCoder(coder);
   }
 
-  private static class SplitIntoRangesFn extends DoFn<ReadableFile, 
KV<ReadableFile, OffsetRange>> {
+  protected abstract DoFn<KV<FileIO.ReadableFile, OffsetRange>, T> 
readRangesFn();
+
+  public static class SplitIntoRangesFn
+      extends DoFn<FileIO.ReadableFile, KV<FileIO.ReadableFile, OffsetRange>> {
     private final long desiredBundleSizeBytes;
 
-    private SplitIntoRangesFn(long desiredBundleSizeBytes) {
+    public SplitIntoRangesFn(long desiredBundleSizeBytes) {
       this.desiredBundleSizeBytes = desiredBundleSizeBytes;
     }
 
     @ProcessElement
     public void process(ProcessContext c) {
-      Metadata metadata = c.element().getMetadata();
+      MatchResult.Metadata metadata = c.element().getMetadata();
       if (!metadata.isReadSeekEfficient()) {
         c.output(KV.of(c.element(), new OffsetRange(0, metadata.sizeBytes())));
         return;
@@ -112,33 +100,40 @@ public class ReadAllViaFileBasedSource<T>
     }
   }
 
-  private static class ReadFileRangesFn<T> extends DoFn<KV<ReadableFile, 
OffsetRange>, T> {
-    private final SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource;
-    private final ReadFileRangesFnExceptionHandler exceptionHandler;
+  public abstract static class AbstractReadFileRangesFn<InT, T>
+      extends DoFn<KV<FileIO.ReadableFile, OffsetRange>, T> {
+    private final SerializableFunction<String, ? extends FileBasedSource<InT>> 
createSource;
+    private final ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler 
exceptionHandler;
 
-    private ReadFileRangesFn(
-        SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource,
-        ReadFileRangesFnExceptionHandler exceptionHandler) {
+    public AbstractReadFileRangesFn(
+        SerializableFunction<String, ? extends FileBasedSource<InT>> 
createSource,
+        ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler 
exceptionHandler) {
       this.createSource = createSource;
       this.exceptionHandler = exceptionHandler;
     }
 
+    protected abstract T makeOutput(
+        FileIO.ReadableFile file,
+        OffsetRange range,
+        FileBasedSource<InT> fileBasedSource,
+        BoundedSource.BoundedReader<InT> reader);
+
     @ProcessElement
     @SuppressFBWarnings(
         value = "RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE",
         justification = "https://github.com/spotbugs/spotbugs/issues/756";)
     public void process(ProcessContext c) throws IOException {
-      ReadableFile file = c.element().getKey();
+      FileIO.ReadableFile file = c.element().getKey();
       OffsetRange range = c.element().getValue();
-      FileBasedSource<T> source =
+      FileBasedSource<InT> source =
           
CompressedSource.from(createSource.apply(file.getMetadata().resourceId().toString()))
               .withCompression(file.getCompression());
-      try (BoundedSource.BoundedReader<T> reader =
+      try (BoundedSource.BoundedReader<InT> reader =
           source
               .createForSubrangeOfFile(file.getMetadata(), range.getFrom(), 
range.getTo())
               .createReader(c.getPipelineOptions())) {
         for (boolean more = reader.start(); more; more = reader.advance()) {
-          c.output(reader.getCurrent());
+          c.output(makeOutput(file, range, source, reader));
         }
       } catch (RuntimeException e) {
         if (exceptionHandler.apply(file, range, e)) {
@@ -147,16 +142,4 @@ public class ReadAllViaFileBasedSource<T>
       }
     }
   }
-
-  /** A class to handle errors which occur during file reads. */
-  public static class ReadFileRangesFnExceptionHandler implements Serializable 
{
-
-    /*
-     * Applies the desired handler logic to the given exception and returns
-     * if the exception should be thrown.
-     */
-    public boolean apply(ReadableFile file, OffsetRange range, Exception e) {
-      return true;
-    }
-  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceWithFilename.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceWithFilename.java
new file mode 100644
index 00000000000..9190c885953
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceWithFilename.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Reads each file of the input {@link PCollection} and outputs each element 
as the value of a
+ * {@link KV}, where the key is the filename from which that value came.
+ *
+ * <p>Reads each {@link FileIO.ReadableFile} using given parameters for 
splitting files into offset
+ * ranges and for creating a {@link FileBasedSource} for a file. The input 
{@link PCollection} must
+ * not contain {@link ResourceId#isDirectory directories}.
+ *
+ * <p>To obtain the collection of {@link FileIO.ReadableFile} from a 
filepattern, use {@link
+ * FileIO#readMatches()}.
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class ReadAllViaFileBasedSourceWithFilename<T>
+    extends ReadAllViaFileBasedSourceTransform<T, KV<String, T>> {
+
+  public ReadAllViaFileBasedSourceWithFilename(
+      final long desiredBundleSizeBytes,
+      final SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource,
+      final Coder<KV<String, T>> coder) {
+    super(desiredBundleSizeBytes, createSource, coder);
+  }
+
+  @Override
+  protected DoFn<KV<FileIO.ReadableFile, OffsetRange>, KV<String, T>> 
readRangesFn() {
+    return new ReadFileRangesFn<>(createSource, exceptionHandler);
+  }
+
+  private static class ReadFileRangesFn<T> extends AbstractReadFileRangesFn<T, 
KV<String, T>> {
+    public ReadFileRangesFn(
+        final SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource,
+        final ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler 
exceptionHandler) {
+      super(createSource, exceptionHandler);
+    }
+
+    @Override
+    protected KV<String, T> makeOutput(
+        final FileIO.ReadableFile file,
+        final OffsetRange range,
+        final FileBasedSource<T> fileBasedSource,
+        final BoundedSource.BoundedReader<T> reader) {
+      return KV.of(file.getMetadata().resourceId().toString(), 
reader.getCurrent());
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index c800f50e49d..1b878d7ad50 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -53,16 +53,16 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-class TextSource extends FileBasedSource<String> {
+public class TextSource extends FileBasedSource<String> {
   byte[] delimiter;
 
-  TextSource(
+  public TextSource(
       ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment, 
byte[] delimiter) {
     super(fileSpec, emptyMatchTreatment, 1L);
     this.delimiter = delimiter;
   }
 
-  private TextSource(MatchResult.Metadata metadata, long start, long end, 
byte[] delimiter) {
+  public TextSource(MatchResult.Metadata metadata, long start, long end, 
byte[] delimiter) {
     super(metadata, 1L, start, end);
     this.delimiter = delimiter;
   }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 4c2812efe0d..e26d422ad8e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
+import java.util.stream.Collectors;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.file.CodecFactory;
@@ -64,10 +65,12 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -91,6 +94,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -319,6 +323,45 @@ public class AvroIOTest implements Serializable {
       readPipeline.run();
     }
 
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadWithFilename() throws Throwable {
+      List<GenericClass> values =
+          ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
+      File outputFile = tmpFolder.newFile("output.avro");
+
+      writePipeline
+          .apply(Create.of(values))
+          .apply(
+              AvroIO.write(GenericClass.class)
+                  .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
+                  .withoutSharding());
+      writePipeline.run();
+
+      SerializableFunction<String, ? extends FileBasedSource<GenericClass>> 
createSource =
+          input ->
+              AvroSource.from(ValueProvider.StaticValueProvider.of(input))
+                  .withSchema(GenericClass.class);
+
+      final PCollection<KV<String, GenericClass>> lines =
+          readPipeline
+              
.apply(Create.of(Collections.singletonList(outputFile.getAbsolutePath())))
+              .apply(FileIO.matchAll())
+              .apply(FileIO.readMatches().withCompression(AUTO))
+              .apply(
+                  new ReadAllViaFileBasedSourceWithFilename<>(
+                      10,
+                      createSource,
+                      KvCoder.of(StringUtf8Coder.of(), 
AvroCoder.of(GenericClass.class))));
+
+      PAssert.that(lines)
+          .containsInAnyOrder(
+              values.stream()
+                  .map(v -> KV.of(outputFile.getAbsolutePath(), v))
+                  .collect(Collectors.toList()));
+      readPipeline.run();
+    }
+
     @Test
     @Category(NeedsRunner.class)
     public void testWriteThenReadCustomType() throws Throwable {
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 414114ff42a..efae0ba77fa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -57,11 +57,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
@@ -78,6 +80,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.ToString;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -86,6 +89,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
@@ -953,6 +957,49 @@ public class TextIOReadTest {
       p.run();
     }
 
+    private List<KV<String, String>> filenameKV(Path path, String fn, 
List<String> input) {
+      return input.stream()
+          .map(l -> KV.of(path.resolve(fn).toString(), l))
+          .collect(Collectors.toList());
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadFilesWithFilename() throws IOException {
+      Path tempFolderPath = tempFolder.getRoot().toPath();
+      writeToFile(TINY, tempFolder, "readAllTiny1.zip", ZIP);
+      writeToFile(TINY, tempFolder, "readAllTiny2.txt", UNCOMPRESSED);
+      writeToFile(LARGE, tempFolder, "readAllLarge1.zip", ZIP);
+      writeToFile(LARGE, tempFolder, "readAllLarge2.txt", UNCOMPRESSED);
+
+      SerializableFunction<String, ? extends FileBasedSource<String>> 
createSource =
+          input ->
+              new TextSource(
+                  ValueProvider.StaticValueProvider.of(input),
+                  EmptyMatchTreatment.DISALLOW,
+                  new byte[] {'\n'});
+
+      PCollection<KV<String, String>> lines =
+          p.apply(
+                  Create.of(
+                      tempFolderPath.resolve("readAllTiny*").toString(),
+                      tempFolderPath.resolve("readAllLarge*").toString()))
+              .apply(FileIO.matchAll())
+              .apply(FileIO.readMatches().withCompression(AUTO))
+              .apply(
+                  new ReadAllViaFileBasedSourceWithFilename<>(
+                      10, createSource, KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of())));
+
+      PAssert.that(lines)
+          .containsInAnyOrder(
+              Iterables.concat(
+                  filenameKV(tempFolderPath, "readAllTiny1.zip", TINY),
+                  filenameKV(tempFolderPath, "readAllTiny2.txt", TINY),
+                  filenameKV(tempFolderPath, "readAllLarge1.zip", LARGE),
+                  filenameKV(tempFolderPath, "readAllLarge2.txt", LARGE)));
+      p.run();
+    }
+
     @Test
     @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class})
     public void testReadWatchForNewFiles() throws IOException, 
InterruptedException {

Reply via email to