[ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79712&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79712
 ]

ASF GitHub Bot logged work on BEAM-3565:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Mar/18 02:33
            Start Date: 13/Mar/18 02:33
    Worklog Time Spent: 10m 
      Work Description: tgroh commented on a change in pull request #4844: 
[BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#discussion_r174001680
 
 

 ##########
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##########
 @@ -84,64 +81,72 @@
    * follows:
    *
    * <ul>
-   *   <li>The {@link PTransform#getSubtransformsList()} contains the result 
of {@link
-   *       #getTransforms()}.
+   *   <li>The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *       that executable stages are treated as primitive transforms.
    *   <li>The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
    *       {@link #getInputPCollection()}.
    *   <li>The output {@link PCollection PCollections} in the values of {@link
    *       PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
    *       {@link #getOutputPCollections()}.
+   *   <li>The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
+   *       and output PCollections set to the same values as the outer 
PTransform itself. It further
+   *       contains the environment set of transforms for this stage.
    * </ul>
+   *
+   * <p>The executable stage can be reconstructed from the resulting {@link 
ExecutableStagePayload}
+   * and components alone via {@link #fromPayload(ExecutableStagePayload, 
Components)}.
    */
   default PTransform toPTransform() {
+    ExecutableStagePayload.Builder payload = 
ExecutableStagePayload.newBuilder();
+
+    payload.setEnvironment(getEnvironment());
+
+    PCollectionNode input = getInputPCollection();
+    payload.setInput(input.getId());
+
+    for (PTransformNode transform : getTransforms()) {
+      payload.addTransforms(transform.getId());
+    }
+
+    for (PCollectionNode output : getOutputPCollections()) {
+      payload.addOutputs(output.getId());
+    }
+
     PTransform.Builder pt = PTransform.newBuilder();
+    pt.setSpec(FunctionSpec.newBuilder()
+        .setUrn(ExecutableStage.URN)
+        .setPayload(payload.build().toByteString())
+        .build());
     pt.putInputs("input", getInputPCollection().getId());
-    int i = 0;
-    for (PCollectionNode materializedPCollection : getOutputPCollections()) {
-      pt.putOutputs(String.format("materialized_%s", i), 
materializedPCollection.getId());
-      i++;
-    }
-    for (PTransformNode fusedTransform : getTransforms()) {
-      pt.addSubtransforms(fusedTransform.getId());
+    int outputIndex = 0;
+    for (PCollectionNode pcNode : getOutputPCollections()) {
+      // Do something
+      pt.putOutputs(String.format("materialized_%d", outputIndex), 
pcNode.getId());
+      outputIndex++;
     }
-    pt.setSpec(FunctionSpec.newBuilder().setUrn(ExecutableStage.URN));
     return pt.build();
   }
 
+  // TODO: Should this live under ExecutableStageTranslation?
 
 Review comment:
   Regardless of if it should or shouldn't, you should have a JIRA to determine 
it.
   
   For what it's worth, I think the toProto and fromProto methods should be 
coresident, and I think this is a totally reasonable place for them (as it's 
already involved with the proto representation of the pipeline, so we don't get 
significant separability between the java and proto representations.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 79712)
    Time Spent: 12h 10m  (was: 12h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> ------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-3565
>                 URL: https://issues.apache.org/jira/browse/BEAM-3565
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>             Fix For: 2.4.0
>
>          Time Spent: 12h 10m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to