lukecwik commented on a change in pull request #12371:
URL: https://github.com/apache/beam/pull/12371#discussion_r464580334



##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
##########
@@ -214,39 +222,124 @@ public MessageWithComponents getReplacement(
             generateUniqueId(
                 transformId + "/ProcessSizedElementsAndRestrictions",
                 existingComponents::containsTransforms);
-        {
-          PTransform.Builder processSizedElementsAndRestrictions = 
PTransform.newBuilder();
-          processSizedElementsAndRestrictions.putInputs(mainInputName, 
splitAndSizeOutId);
-          processSizedElementsAndRestrictions.putAllInputs(sideInputs);
-          
processSizedElementsAndRestrictions.putAllOutputs(splittableParDo.getOutputsMap());
-          processSizedElementsAndRestrictions.setUniqueName(
-              generateUniquePCollectonName(
-                  splittableParDo.getUniqueName() + 
"/ProcessSizedElementsAndRestrictions",
-                  existingComponents));
-          processSizedElementsAndRestrictions.setSpec(
-              FunctionSpec.newBuilder()
-                  .setUrn(
-                      
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN)
-                  .setPayload(splittableParDo.getSpec().getPayload()));
-          
processSizedElementsAndRestrictions.setEnvironmentId(splittableParDo.getEnvironmentId());
+        if (!isDrain) {

Review comment:
       Should we able to simplify this more with?
   ```
   PTransform.Builder newCompositeRoot = ... add pair w/ restriction and split 
and size ...
   String processSizedElementsInputPCollectionId = splitAndSizeOutId;
   if (isDrain) {
     ... add drain transform ...
     newCompositeRoot.add drain transform
     String processSizedElementsInputPCollectionId = truncateAndSizeOutId;
   }
   ... add process sized elements transform ...
   newCompositeRoot.add process sized transform
   ```
   
   I don't think we should need two copies of adding 
`ProcessSizedElementsAndRestrictions`
   

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
##########
@@ -91,13 +91,21 @@ public static TransformReplacement createSizedReplacement() 
{
    * .
    */
   public static TransformReplacement createTruncateReplacement() {
-    return TruncateReplacement.INSTANCE;
+    return SizedReplacement.DRAIN_INSTANCE;
   }
 
   /** See {@link #createSizedReplacement()} for details. */
   private static class SizedReplacement implements TransformReplacement {
 
     private static final SizedReplacement INSTANCE = new SizedReplacement();
+    private static final SizedReplacement DRAIN_INSTANCE = new 
SizedReplacement().withDrain();
+
+    private boolean isDrain = false;

Review comment:
       It would be nice if we could make this final by using `@AutoBuilder` 
allowing for:
   ```suggestion
       private final boolean isDrain;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to