This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 54eacf4b79993f40b9034bf429e387faeffdbdba
Author: Eugene Kirpichov <kirpic...@google.com>
AuthorDate: Wed Nov 15 18:58:28 2017 -0800

    remove ShardAssignment
---
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 118 +++++++++------------
 1 file changed, 48 insertions(+), 70 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 19457e6..28ac1a5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -478,19 +479,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING }
-
   /*
    * Like {@link WriteBundles}, but where the elements for each shard have 
been collected into a
    * single iterable.
    */
   private class WriteShardedBundles
       extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, 
FileResult<DestinationT>> {
-    ShardAssignment shardNumberAssignment;
-    WriteShardedBundles(ShardAssignment shardNumberAssignment) {
-      this.shardNumberAssignment = shardNumberAssignment;
-    }
-
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
       sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
@@ -527,13 +521,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           writer.cleanup();
           throw e;
         }
-        int shardNumber =
-            shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
-                ? c.element().getKey().getShardNumber()
-                : UNKNOWN_SHARDNUM;
-        c.output(
-            new FileResult<>(
-                writer.getOutputFile(), shardNumber, window, c.pane(), 
entry.getKey()));
+        int shard = c.element().getKey().getShardNumber();
+        c.output(new FileResult<>(writer.getOutputFile(), shard, window, 
c.pane(), entry.getKey()));
       }
     }
 
@@ -672,8 +661,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     // PCollection. There is a dependency between this ParDo and the first (the
     // WriteOperation PCollection as a side input), so this will happen after 
the
     // initial ParDo.
-    PCollection<FileResult<DestinationT>> results;
-    final PCollectionView<Integer> numShardsView;
+    PCollectionView<Integer> numShardsView =
+        (computeNumShards == null) ? null : input.apply(computeNumShards);
+    List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null
+        ? ImmutableList.<PCollectionView<Integer>>of()
+        : ImmutableList.of(numShardsView);
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> shardedWindowCoder =
         (Coder<BoundedWindow>) 
input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -686,74 +678,65 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     } catch (CannotProvideCoderException | NonDeterministicException e) {
       throw new RuntimeException(e);
     }
+    FileResultCoder<DestinationT> fileResultCoder =
+        FileResultCoder.of(shardedWindowCoder, destinationCoder);
 
+    PCollection<FileResult<DestinationT>> results;
     if (computeNumShards == null && numShardsProvider == null) {
-      numShardsView = null;
       TupleTag<FileResult<DestinationT>> writtenRecordsTag =
           new TupleTag<>("writtenRecordsTag");
-      TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittedRecordsTag =
-          new TupleTag<>("unwrittenRecordsTag");
+      TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag =
+          new TupleTag<>("spilledRecordsTag");
       String writeName = windowedWrites ? "WriteWindowedBundles" : 
"WriteBundles";
       PCollectionTuple writeTuple =
           input.apply(
               writeName,
-              ParDo.of(new WriteBundles(unwrittedRecordsTag, destinationCoder))
+              ParDo.of(new WriteBundles(spilledRecordsTag, destinationCoder))
                   .withSideInputs(sideInputs)
-                  .withOutputTags(writtenRecordsTag, 
TupleTagList.of(unwrittedRecordsTag)));
+                  .withOutputTags(writtenRecordsTag, 
TupleTagList.of(spilledRecordsTag)));
       PCollection<FileResult<DestinationT>> writtenBundleFiles =
-          writeTuple
-              .get(writtenRecordsTag)
-              .setCoder(FileResultCoder.of(shardedWindowCoder, 
destinationCoder));
+          writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
       // Any "spilled" elements are written using WriteShardedBundles. Assign 
shard numbers in
       // finalize to stay consistent with what WriteWindowedBundles does.
-      PCollection<FileResult<DestinationT>> writtenGroupedFiles =
+      PCollection<FileResult<DestinationT>> writtenSpilledFiles =
           writeTuple
-              .get(unwrittedRecordsTag)
+              .get(spilledRecordsTag)
               .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), 
input.getCoder()))
-              .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, 
UserT>create())
+              // Here we group by a synthetic shard number in the range [0, 
spill factor),
+              // just for the sake of getting some parallelism within each 
destination when
+              // writing the spilled records, whereas the non-spilled records 
don't have a shard
+              // number assigned at all. Drop the shard number on the spilled 
records so that
+              // shard numbers are assigned together to both the spilled and 
non-spilled files in
+              // finalize.
+              .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, 
UserT>create())
               .apply(
-                  "WriteUnwritten",
-                  ParDo.of(new 
WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE))
-                      .withSideInputs(sideInputs))
-              .setCoder(FileResultCoder.of(shardedWindowCoder, 
destinationCoder));
+                  "WriteSpilled", ParDo.of(new 
WriteShardedBundles()).withSideInputs(sideInputs))
+              .setCoder(fileResultCoder)
+              .apply("DropShardNum", ParDo.of(
+                  new DoFn<FileResult<DestinationT>, 
FileResult<DestinationT>>() {
+                    @ProcessElement
+                    public void process(ProcessContext c) {
+                      c.output(c.element().withShard(UNKNOWN_SHARDNUM));
+                    }
+                  }));
       results =
           PCollectionList.of(writtenBundleFiles)
-              .and(writtenGroupedFiles)
+              .and(writtenSpilledFiles)
               .apply(Flatten.<FileResult<DestinationT>>pCollections());
     } else {
-      List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList();
-      if (computeNumShards != null) {
-        numShardsView = input.apply(computeNumShards);
-        shardingSideInputs.add(numShardsView);
-      } else {
-        numShardsView = null;
-      }
-      PCollection<KV<ShardedKey<Integer>, Iterable<UserT>>> sharded =
+      results =
           input
               .apply(
                   "ApplyShardLabel",
-                  ParDo.of(
-                          new ApplyShardingKey(
-                              numShardsView,
-                              (numShardsView != null) ? null : 
numShardsProvider,
-                              destinationCoder))
+                  ParDo.of(new ApplyShardingKey(numShardsView, 
numShardsProvider, destinationCoder))
                       .withSideInputs(shardingSideInputs))
               .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), 
input.getCoder()))
-              .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, 
UserT>create());
-      shardedWindowCoder =
-          (Coder<BoundedWindow>) 
sharded.getWindowingStrategy().getWindowFn().windowCoder();
-      // Since this path might be used by streaming runners processing 
triggers, it's important
-      // to assign shard numbers here so that they are deterministic. The 
ASSIGN_IN_FINALIZE
-      // strategy works by sorting all FileResult objects and assigning them 
numbers, which is not
-      // guaranteed to work well when processing triggers - if the finalize 
step retries it might
-      // see a different Iterable of FileResult objects, and it will assign 
different shard numbers.
-      results =
-          sharded.apply(
-              "WriteShardedBundles",
-              ParDo.of(new 
WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING))
-                  .withSideInputs(sideInputs));
+              .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, 
UserT>create())
+              .apply(
+                  "WriteShardedBundles",
+                  ParDo.of(new 
WriteShardedBundles()).withSideInputs(this.sideInputs));
     }
-    results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
+    results.setCoder(fileResultCoder);
 
     PCollection<KV<DestinationT, String>> outputFilenames;
     if (windowedWrites) {
@@ -773,7 +756,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
               .apply(
                   "FinalizeWindowed",
                   ParDo.of(
-                          new FinalizeWindowedFn<DestinationT>(
+                          new FinalizeWindowedFn<>(
                               numShardsView, numShardsProvider, 
writeOperation))
                       .withSideInputs(
                           numShardsView == null
@@ -783,12 +766,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     } else {
       final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
           results.apply(View.<FileResult<DestinationT>>asIterable());
-      ImmutableList.Builder<PCollectionView<?>> finalizeSideInputs =
-          ImmutableList.<PCollectionView<?>>builder().add(resultsView);
-      if (numShardsView != null) {
-        finalizeSideInputs.add(numShardsView);
-      }
-      finalizeSideInputs.addAll(sideInputs);
 
       // Finalize the write in another do-once ParDo on the singleton 
collection containing the
       // Writer. The results from the per-bundle writes are given as an 
Iterable side input.
@@ -806,16 +783,17 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                   ParDo.of(
                           new FinalizeUnwindowedFn<>(
                               numShardsView, numShardsProvider, resultsView, 
writeOperation))
-                      .withSideInputs(finalizeSideInputs.build()))
+                      .withSideInputs(
+                          FluentIterable.concat(sideInputs, shardingSideInputs)
+                              .append(resultsView)
+                              .toList()))
               .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
     }
 
     TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
         new TupleTag<>("perDestinationOutputFilenames");
     return WriteFilesResult.in(
-        input.getPipeline(),
-        perDestinationOutputFilenamesTag,
-        outputFilenames);
+        input.getPipeline(), perDestinationOutputFilenamesTag, 
outputFilenames);
   }
 
   private static class FinalizeWindowedFn<DestinationT>

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <commits@beam.apache.org>.

Reply via email to