prodriguezdefino commented on code in PR #29566:
URL: https://github.com/apache/beam/pull/29566#discussion_r1476626897


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java:
##########
@@ -35,59 +35,114 @@
  * <p>To obtain the collection of {@link ReadableFile} from a filepattern, use 
{@link
  * FileIO#readMatches()}.
  */
-public class ReadAllViaFileBasedSource<T> extends 
ReadAllViaFileBasedSourceTransform<T, T> {
-  public ReadAllViaFileBasedSource(
+public class ReadAllViaFileBasedSource<T, K> extends 
ReadAllViaFileBasedSourceTransform<T, K> {
+
+  private final SerializableFunction<OutputContextFromFile<T>, K> outputFn;
+
+  protected ReadAllViaFileBasedSource(
       long desiredBundleSizeBytes,
       SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
-      Coder<T> coder) {
+      Coder<K> coder,
+      SerializableFunction<OutputContextFromFile<T>, K> outputFn) {
     super(
         desiredBundleSizeBytes,
         createSource,
         coder,
         DEFAULT_USES_RESHUFFLE,
-        new ReadFileRangesFnExceptionHandler());
+        new 
ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler());
+    this.outputFn = outputFn;
   }
 
-  public ReadAllViaFileBasedSource(
+  protected ReadAllViaFileBasedSource(
       long desiredBundleSizeBytes,
       SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
-      Coder<T> coder,
+      Coder<K> coder,
       boolean usesReshuffle,
-      ReadFileRangesFnExceptionHandler exceptionHandler) {
+      ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler 
exceptionHandler,
+      SerializableFunction<OutputContextFromFile<T>, K> outputFn) {
     super(desiredBundleSizeBytes, createSource, coder, usesReshuffle, 
exceptionHandler);
+    this.outputFn = outputFn;
+  }
+
+  public static <InputT> ReadAllViaFileBasedSource<InputT, InputT> create(
+      long desiredBundleSizeBytes,
+      SerializableFunction<String, ? extends FileBasedSource<InputT>> 
createSource,
+      Coder<InputT> coder,
+      boolean usesReshuffle,
+      ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler 
exceptionHandler) {
+    return new ReadAllViaFileBasedSource<>(
+        desiredBundleSizeBytes,
+        createSource,
+        coder,
+        usesReshuffle,
+        exceptionHandler,
+        outputArguments -> outputArguments.reader().getCurrent());
+  }
+
+  public static <InputT> ReadAllViaFileBasedSource<InputT, InputT> create(
+      long desiredBundleSizeBytes,
+      SerializableFunction<String, ? extends FileBasedSource<InputT>> 
createSource,
+      Coder<InputT> coder) {
+    return create(
+        desiredBundleSizeBytes,
+        createSource,
+        coder,
+        outputArguments -> outputArguments.reader().getCurrent());
+  }
+
+  public static <InputT, OutputT> ReadAllViaFileBasedSource<InputT, OutputT> 
create(
+      long desiredBundleSizeBytes,
+      SerializableFunction<String, ? extends FileBasedSource<InputT>> 
createSource,
+      Coder<OutputT> coder,
+      SerializableFunction<OutputContextFromFile<InputT>, OutputT> outputFn) {
+    return new ReadAllViaFileBasedSource<>(desiredBundleSizeBytes, 
createSource, coder, outputFn);
   }
 
   @Override
-  protected DoFn<KV<ReadableFile, OffsetRange>, T> readRangesFn() {
-    return new ReadFileRangesFn<>(createSource, exceptionHandler);
+  protected DoFn<KV<ReadableFile, OffsetRange>, K> readRangesFn() {
+    return new ReadFileRangesFn<>(outputFn, createSource, exceptionHandler);
   }
 
-  private static class ReadFileRangesFn<T> extends AbstractReadFileRangesFn<T, 
T> {
+  private static class ReadFileRangesFn<T, K> extends 
AbstractReadFileRangesFn<T, K> {
+    private final SerializableFunction<OutputContextFromFile<T>, K> outputFn;
+
     public ReadFileRangesFn(
+        final SerializableFunction<OutputContextFromFile<T>, K> outputFn,
         final SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource,
-        final ReadFileRangesFnExceptionHandler exceptionHandler) {
+        final 
ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler

Review Comment:
   This change obey the movement of that static class 
(`ReadFileRangesFnExceptionHandler`) to the parent class in the hierarchy. In 
master the code has a weird situation where a parent class depends on a static 
class implemented in one of the child classes. 
   
   I moved `ReadFileRangesFnExceptionHandler` to the top of the hierarchy to 
avoid the double reference, and changed the references in the Beam framework. 
Again this may break user code if they make direct reference to this static 
class, so I'm happy to revert that change if preferred. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to