This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 54eacf4b79993f40b9034bf429e387faeffdbdba Author: Eugene Kirpichov <kirpic...@google.com> AuthorDate: Wed Nov 15 18:58:28 2017 -0800 remove ShardAssignment --- .../java/org/apache/beam/sdk/io/WriteFiles.java | 118 +++++++++------------ 1 file changed, 48 insertions(+), 70 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 19457e6..28ac1a5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.base.Objects; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -478,19 +479,12 @@ public class WriteFiles<UserT, DestinationT, OutputT> } } - enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING } - /* * Like {@link WriteBundles}, but where the elements for each shard have been collected into a * single iterable. */ private class WriteShardedBundles extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> { - ShardAssignment shardNumberAssignment; - WriteShardedBundles(ShardAssignment shardNumberAssignment) { - this.shardNumberAssignment = shardNumberAssignment; - } - @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c); @@ -527,13 +521,8 @@ public class WriteFiles<UserT, DestinationT, OutputT> writer.cleanup(); throw e; } - int shardNumber = - shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING - ? c.element().getKey().getShardNumber() - : UNKNOWN_SHARDNUM; - c.output( - new FileResult<>( - writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey())); + int shard = c.element().getKey().getShardNumber(); + c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey())); } } @@ -672,8 +661,11 @@ public class WriteFiles<UserT, DestinationT, OutputT> // PCollection. There is a dependency between this ParDo and the first (the // WriteOperation PCollection as a side input), so this will happen after the // initial ParDo. - PCollection<FileResult<DestinationT>> results; - final PCollectionView<Integer> numShardsView; + PCollectionView<Integer> numShardsView = + (computeNumShards == null) ? null : input.apply(computeNumShards); + List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null + ? ImmutableList.<PCollectionView<Integer>>of() + : ImmutableList.of(numShardsView); @SuppressWarnings("unchecked") Coder<BoundedWindow> shardedWindowCoder = (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder(); @@ -686,74 +678,65 @@ public class WriteFiles<UserT, DestinationT, OutputT> } catch (CannotProvideCoderException | NonDeterministicException e) { throw new RuntimeException(e); } + FileResultCoder<DestinationT> fileResultCoder = + FileResultCoder.of(shardedWindowCoder, destinationCoder); + PCollection<FileResult<DestinationT>> results; if (computeNumShards == null && numShardsProvider == null) { - numShardsView = null; TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecordsTag"); - TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittedRecordsTag = - new TupleTag<>("unwrittenRecordsTag"); + TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag = + new TupleTag<>("spilledRecordsTag"); String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles"; PCollectionTuple writeTuple = input.apply( writeName, - ParDo.of(new WriteBundles(unwrittedRecordsTag, destinationCoder)) + ParDo.of(new WriteBundles(spilledRecordsTag, destinationCoder)) .withSideInputs(sideInputs) - .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag))); + .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag))); PCollection<FileResult<DestinationT>> writtenBundleFiles = - writeTuple - .get(writtenRecordsTag) - .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); + writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder); // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in // finalize to stay consistent with what WriteWindowedBundles does. - PCollection<FileResult<DestinationT>> writtenGroupedFiles = + PCollection<FileResult<DestinationT>> writtenSpilledFiles = writeTuple - .get(unwrittedRecordsTag) + .get(spilledRecordsTag) .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) - .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create()) + // Here we group by a synthetic shard number in the range [0, spill factor), + // just for the sake of getting some parallelism within each destination when + // writing the spilled records, whereas the non-spilled records don't have a shard + // number assigned at all. Drop the shard number on the spilled records so that + // shard numbers are assigned together to both the spilled and non-spilled files in + // finalize. + .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create()) .apply( - "WriteUnwritten", - ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)) - .withSideInputs(sideInputs)) - .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); + "WriteSpilled", ParDo.of(new WriteShardedBundles()).withSideInputs(sideInputs)) + .setCoder(fileResultCoder) + .apply("DropShardNum", ParDo.of( + new DoFn<FileResult<DestinationT>, FileResult<DestinationT>>() { + @ProcessElement + public void process(ProcessContext c) { + c.output(c.element().withShard(UNKNOWN_SHARDNUM)); + } + })); results = PCollectionList.of(writtenBundleFiles) - .and(writtenGroupedFiles) + .and(writtenSpilledFiles) .apply(Flatten.<FileResult<DestinationT>>pCollections()); } else { - List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList(); - if (computeNumShards != null) { - numShardsView = input.apply(computeNumShards); - shardingSideInputs.add(numShardsView); - } else { - numShardsView = null; - } - PCollection<KV<ShardedKey<Integer>, Iterable<UserT>>> sharded = + results = input .apply( "ApplyShardLabel", - ParDo.of( - new ApplyShardingKey( - numShardsView, - (numShardsView != null) ? null : numShardsProvider, - destinationCoder)) + ParDo.of(new ApplyShardingKey(numShardsView, numShardsProvider, destinationCoder)) .withSideInputs(shardingSideInputs)) .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) - .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create()); - shardedWindowCoder = - (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder(); - // Since this path might be used by streaming runners processing triggers, it's important - // to assign shard numbers here so that they are deterministic. The ASSIGN_IN_FINALIZE - // strategy works by sorting all FileResult objects and assigning them numbers, which is not - // guaranteed to work well when processing triggers - if the finalize step retries it might - // see a different Iterable of FileResult objects, and it will assign different shard numbers. - results = - sharded.apply( - "WriteShardedBundles", - ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)) - .withSideInputs(sideInputs)); + .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create()) + .apply( + "WriteShardedBundles", + ParDo.of(new WriteShardedBundles()).withSideInputs(this.sideInputs)); } - results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); + results.setCoder(fileResultCoder); PCollection<KV<DestinationT, String>> outputFilenames; if (windowedWrites) { @@ -773,7 +756,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> .apply( "FinalizeWindowed", ParDo.of( - new FinalizeWindowedFn<DestinationT>( + new FinalizeWindowedFn<>( numShardsView, numShardsProvider, writeOperation)) .withSideInputs( numShardsView == null @@ -783,12 +766,6 @@ public class WriteFiles<UserT, DestinationT, OutputT> } else { final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView = results.apply(View.<FileResult<DestinationT>>asIterable()); - ImmutableList.Builder<PCollectionView<?>> finalizeSideInputs = - ImmutableList.<PCollectionView<?>>builder().add(resultsView); - if (numShardsView != null) { - finalizeSideInputs.add(numShardsView); - } - finalizeSideInputs.addAll(sideInputs); // Finalize the write in another do-once ParDo on the singleton collection containing the // Writer. The results from the per-bundle writes are given as an Iterable side input. @@ -806,16 +783,17 @@ public class WriteFiles<UserT, DestinationT, OutputT> ParDo.of( new FinalizeUnwindowedFn<>( numShardsView, numShardsProvider, resultsView, writeOperation)) - .withSideInputs(finalizeSideInputs.build())) + .withSideInputs( + FluentIterable.concat(sideInputs, shardingSideInputs) + .append(resultsView) + .toList())) .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of())); } TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag = new TupleTag<>("perDestinationOutputFilenames"); return WriteFilesResult.in( - input.getPipeline(), - perDestinationOutputFilenamesTag, - outputFilenames); + input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames); } private static class FinalizeWindowedFn<DestinationT> -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" <commits@beam.apache.org>.