Ban Piao created BEAM-9346: ------------------------------ Summary: TFRecordIO inefficient read from sideinput causing pipeline to be slow Key: BEAM-9346 URL: https://issues.apache.org/jira/browse/BEAM-9346 Project: Beam Issue Type: Improvement Components: sdk-java-core Reporter: Ban Piao
In TFRecordIO, Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)) is an inefficient way of reading large set of side input. Pipeline can be sped up significantly by combinging the PCollection<ResultT> to a single element PCollection<List<ResultT>>. Sample code: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L412 from ``` return input .getPipeline() .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder))); ``` to ``` return input.apply("ToList", Combine.globally(new ToListCombineFn<>())); ``` where ToListCombineFn is defined as ``` public static class ToListCombineFn<ResultT> extends CombineFn<ResultT, List<ResultT>, List<ResultT>> { @Override public List<ResultT> createAccumulator() { return new ArrayList<>(); } @Override public List<ResultT> addInput(List<ResultT> mutableAccumulator, ResultT input) { mutableAccumulator.add(input); return mutableAccumulator; } @Override public List<ResultT> mergeAccumulators(Iterable<List<ResultT>> accumulators) { Iterator<List<ResultT>> iter = accumulators.iterator(); if (!iter.hasNext()) { return new ArrayList<>(); } List<ResultT> merged = iter.next(); while (iter.hasNext()) { merged.addAll(iter.next()); } return merged; } @Override public List<ResultT> extractOutput(List<ResultT> accumulator) { return accumulator; } } ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)