This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 83837eb20c1e1e4793f8410de1dc6d5864586f7f Author: Eugene Kirpichov <kirpic...@google.com> AuthorDate: Wed Nov 22 16:34:46 2017 -0800 Renames spilled back to unwritten --- .../src/main/java/org/apache/beam/sdk/io/WriteFiles.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 7e04332..12f5cce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -403,22 +403,23 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> @Override public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) { TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecords"); - TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag = new TupleTag<>("spilledRecords"); + TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag = + new TupleTag<>("unwrittenRecords"); PCollectionTuple writeTuple = input.apply( "WriteUnshardedBundles", ParDo.of( new WriteUnshardedTempFilesWithSpillingFn( - spilledRecordsTag, destinationCoder)) + unwrittenRecordsTag, destinationCoder)) .withSideInputs(getSideInputs()) - .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag))); + .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittenRecordsTag))); PCollection<FileResult<DestinationT>> writtenBundleFiles = writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder); // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in // finalize to stay consistent with what WriteWindowedBundles does. PCollection<FileResult<DestinationT>> writtenSpilledFiles = writeTuple - .get(spilledRecordsTag) + .get(unwrittenRecordsTag) .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) // Here we group by a synthetic shard number in the range [0, spill factor), // just for the sake of getting some parallelism within each destination when @@ -426,9 +427,9 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> // number assigned at all. Drop the shard number on the spilled records so that // shard numbers are assigned together to both the spilled and non-spilled files in // finalize. - .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create()) + .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create()) .apply( - "WriteSpilled", + "WriteUnwritten", ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs())) .setCoder(fileResultCoder) .apply( -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" <commits@beam.apache.org>.