Ban Piao created BEAM-9346:
------------------------------

             Summary: TFRecordIO inefficient read from sideinput causing 
pipeline to be slow
                 Key: BEAM-9346
                 URL: https://issues.apache.org/jira/browse/BEAM-9346
             Project: Beam
          Issue Type: Improvement
          Components: sdk-java-core
            Reporter: Ban Piao


In TFRecordIO, Reify.viewInGlobalWindow(input.apply(View.asList()), 
ListCoder.of(resultCoder)) is an inefficient way of reading large set of side 
input.

Pipeline can be sped up significantly by combinging the PCollection<ResultT> to 
a single element PCollection<List<ResultT>>.

Sample code: 

 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L412
 from
```
return input
            .getPipeline()
            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), 
ListCoder.of(resultCoder)));
```

to

```
return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
```
where ToListCombineFn is defined as
```
public static class ToListCombineFn<ResultT> extends CombineFn<ResultT, 
List<ResultT>, List<ResultT>> {

    @Override
    public List<ResultT> createAccumulator() {
      return new ArrayList<>();
    }

    @Override
    public List<ResultT> addInput(List<ResultT> mutableAccumulator, ResultT 
input) {
      mutableAccumulator.add(input);
      return mutableAccumulator;
    }

    @Override
    public List<ResultT> mergeAccumulators(Iterable<List<ResultT>> 
accumulators) {
      Iterator<List<ResultT>> iter = accumulators.iterator();
      if (!iter.hasNext()) {
        return new ArrayList<>();
      }
      List<ResultT> merged = iter.next();
      while (iter.hasNext()) {
        merged.addAll(iter.next());
      }
      return merged;
    }

    @Override
    public List<ResultT> extractOutput(List<ResultT> accumulator) {
      return accumulator;
    }
  }
```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to