prodriguezdefino commented on code in PR #29566:
URL: https://github.com/apache/beam/pull/29566#discussion_r1476626897
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java:
##########
@@ -35,59 +35,114 @@
* <p>To obtain the collection of {@link ReadableFile} from a filepattern, use
{@link
* FileIO#readMatches()}.
*/
-public class ReadAllViaFileBasedSource<T> extends
ReadAllViaFileBasedSourceTransform<T, T> {
- public ReadAllViaFileBasedSource(
+public class ReadAllViaFileBasedSource<T, K> extends
ReadAllViaFileBasedSourceTransform<T, K> {
+
+ private final SerializableFunction<OutputContextFromFile<T>, K> outputFn;
+
+ protected ReadAllViaFileBasedSource(
long desiredBundleSizeBytes,
SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
- Coder<T> coder) {
+ Coder<K> coder,
+ SerializableFunction<OutputContextFromFile<T>, K> outputFn) {
super(
desiredBundleSizeBytes,
createSource,
coder,
DEFAULT_USES_RESHUFFLE,
- new ReadFileRangesFnExceptionHandler());
+ new
ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler());
+ this.outputFn = outputFn;
}
- public ReadAllViaFileBasedSource(
+ protected ReadAllViaFileBasedSource(
long desiredBundleSizeBytes,
SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
- Coder<T> coder,
+ Coder<K> coder,
boolean usesReshuffle,
- ReadFileRangesFnExceptionHandler exceptionHandler) {
+ ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler
exceptionHandler,
+ SerializableFunction<OutputContextFromFile<T>, K> outputFn) {
super(desiredBundleSizeBytes, createSource, coder, usesReshuffle,
exceptionHandler);
+ this.outputFn = outputFn;
+ }
+
+ public static <InputT> ReadAllViaFileBasedSource<InputT, InputT> create(
+ long desiredBundleSizeBytes,
+ SerializableFunction<String, ? extends FileBasedSource<InputT>>
createSource,
+ Coder<InputT> coder,
+ boolean usesReshuffle,
+ ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler
exceptionHandler) {
+ return new ReadAllViaFileBasedSource<>(
+ desiredBundleSizeBytes,
+ createSource,
+ coder,
+ usesReshuffle,
+ exceptionHandler,
+ outputArguments -> outputArguments.reader().getCurrent());
+ }
+
+ public static <InputT> ReadAllViaFileBasedSource<InputT, InputT> create(
+ long desiredBundleSizeBytes,
+ SerializableFunction<String, ? extends FileBasedSource<InputT>>
createSource,
+ Coder<InputT> coder) {
+ return create(
+ desiredBundleSizeBytes,
+ createSource,
+ coder,
+ outputArguments -> outputArguments.reader().getCurrent());
+ }
+
+ public static <InputT, OutputT> ReadAllViaFileBasedSource<InputT, OutputT>
create(
+ long desiredBundleSizeBytes,
+ SerializableFunction<String, ? extends FileBasedSource<InputT>>
createSource,
+ Coder<OutputT> coder,
+ SerializableFunction<OutputContextFromFile<InputT>, OutputT> outputFn) {
+ return new ReadAllViaFileBasedSource<>(desiredBundleSizeBytes,
createSource, coder, outputFn);
}
@Override
- protected DoFn<KV<ReadableFile, OffsetRange>, T> readRangesFn() {
- return new ReadFileRangesFn<>(createSource, exceptionHandler);
+ protected DoFn<KV<ReadableFile, OffsetRange>, K> readRangesFn() {
+ return new ReadFileRangesFn<>(outputFn, createSource, exceptionHandler);
}
- private static class ReadFileRangesFn<T> extends AbstractReadFileRangesFn<T,
T> {
+ private static class ReadFileRangesFn<T, K> extends
AbstractReadFileRangesFn<T, K> {
+ private final SerializableFunction<OutputContextFromFile<T>, K> outputFn;
+
public ReadFileRangesFn(
+ final SerializableFunction<OutputContextFromFile<T>, K> outputFn,
final SerializableFunction<String, ? extends FileBasedSource<T>>
createSource,
- final ReadFileRangesFnExceptionHandler exceptionHandler) {
+ final
ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler
Review Comment:
This change obey the movement of that static class
(`ReadFileRangesFnExceptionHandler`) to the parent class in the hierarchy. In
master the code has a weird situation where a parent class depends on a static
class implemented in one of the child classes.
I moved `ReadFileRangesFnExceptionHandler` to the top of the hierarchy to
avoid the double reference, and changed the references in the Beam framework.
Again this may break user code if they make direct reference to this static
class, so I'm happy to revert that change if preferred.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]