[ 
https://issues.apache.org/jira/browse/BEAM-8292?focusedWorklogId=409086&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-409086
 ]

ASF GitHub Bot logged work on BEAM-8292:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Mar/20 20:49
            Start Date: 24/Mar/20 20:49
    Worklog Time Spent: 10m 
      Work Description: youngoli commented on pull request #11197: [BEAM-8292] 
Portable Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#discussion_r397451550
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
 ##########
 @@ -473,6 +479,164 @@ func (m *marshaller) addNode(n *graph.Node) string {
        return m.makeNode(id, m.coders.Add(n.Coder), n)
 }
 
+// expandReshuffle translates resharding to a composite reshuffle
+// transform.
+//
+// With proper runner support, the SDK doesn't need to do anything.
+// However, we still need to provide a backup plan in terms of other
+// PTransforms in the event the runner doesn't have a native implementation.
+//
+// In particular, the "backup plan" needs to:
+//
+//  * Encode the windowed element, preserving timestamps.
+//  * Add random keys to the encoded windowed element []bytes
+//  * GroupByKey (in the global window).
+//  * Explode the resulting elements list.
+//  * Decode the windowed element []bytes.
+//
+// While a simple reshard can be written in user terms, (timestamps and windows
+// are accessible to user functions) there are some framework internal
+// optimizations that can be done if the framework is aware of the reshard, 
though
+// ideally this is handled on the runner side.
+//
+// User code is able to write reshards, but it's easier to access
+// the window coders framework side, which is critical for the reshard
+// to function with unbounded inputs.
+func (m *marshaller) expandReshuffle(edge NamedEdge) string {
+       id := edgeID(edge.Edge)
+       var kvCoderID, gbkCoderID string
+       {
+               kv := makeUnionCoder()
+               kvCoderID = m.coders.Add(kv)
+               gbkCoderID = m.coders.Add(coder.NewCoGBK(kv.Components))
+       }
+
+       var subtransforms []string
+
+       in := edge.Edge.Input[0]
+
+       origInput := m.addNode(in.From)
+       // We need to preserve the old windowing/triggering here
+       // for re-instatement after the GBK.
+       preservedWSId := m.pcollections[origInput].GetWindowingStrategyId()
+
+       // Get the windowing strategy from before:
+       postReify := fmt.Sprintf("%v_%v_reifyts", nodeID(in.From), id)
+       m.makeNode(postReify, kvCoderID, in.From)
+
+       // We need to replace postReify's windowing strategy with one 
appropriate
+       // for reshuffles.
+       {
+               wfn := window.NewGlobalWindows()
+               m.pcollections[postReify].WindowingStrategyId =
+                       m.internWindowingStrategy(&pb.WindowingStrategy{
+                               // Not segregated by time...
+                               WindowFn: makeWindowFn(wfn),
+                               // ...output after every element is received...
+                               Trigger: &pb.Trigger{
+                                       // Should this be an Always trigger 
instead?
+                                       Trigger: &pb.Trigger_ElementCount_{
+                                               ElementCount: 
&pb.Trigger_ElementCount{
+                                                       ElementCount: 1,
+                                               },
+                                       },
+                               },
+                               // ...and after outputing, discard the output 
elements...
+                               AccumulationMode: 
pb.AccumulationMode_DISCARDING,
+                               // ...and since every pane should have 1 
element,
+                               // try to preserve the timestamp.
+                               OutputTime: pb.OutputTime_EARLIEST_IN_PANE,
+                               // Defaults copied from 
marshalWindowingStrategy.
+                               // TODO(BEAM-3304): migrate to user side 
operations once trigger support is in.
+                               EnvironmentId:   m.addDefaultEnv(),
+                               MergeStatus:     pb.MergeStatus_NON_MERGING,
+                               WindowCoderId:   
m.coders.AddWindowCoder(makeWindowCoder(wfn)),
+                               ClosingBehavior: 
pb.ClosingBehavior_EMIT_IF_NONEMPTY,
+                               AllowedLateness: 0,
+                               OnTimeBehavior:  pb.OnTimeBehavior_FIRE_ALWAYS,
+                       })
+       }
+
+       // Inputs (i)
+
+       inputID := fmt.Sprintf("%v_reifyts", id)
+       payload := &pb.ParDoPayload{
+               DoFn: &pb.FunctionSpec{
+                       Urn: URNReshuffleInput,
+                       Payload: 
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
+                               Urn: URNReshuffleInput,
+                       })),
+               },
+       }
+       input := &pb.PTransform{
+               UniqueName: inputID,
+               Spec: &pb.FunctionSpec{
+                       Urn:     URNParDo,
+                       Payload: protox.MustEncode(payload),
+               },
+               Inputs:        map[string]string{"i0": nodeID(in.From)},
+               Outputs:       map[string]string{"i0": postReify},
+               EnvironmentId: m.addDefaultEnv(),
+       }
+       m.transforms[inputID] = input
+       subtransforms = append(subtransforms, inputID)
+
+       outNode := edge.Edge.Output[0].To
+
+       // GBK
+
+       gbkOut := fmt.Sprintf("%v_out", nodeID(outNode))
+       m.makeNode(gbkOut, gbkCoderID, outNode)
+
+       gbkID := fmt.Sprintf("%v_gbk", id)
+       gbk := &pb.PTransform{
+               UniqueName: gbkID,
+               Spec:       &pb.FunctionSpec{Urn: URNGBK},
+               Inputs:     map[string]string{"i0": postReify},
 
 Review comment:
   Oh ok, I see it now. I was completely misinterpreting the expansion here. So 
if I understand correctly, it should look like this, right? (With 
pcollections/nodes in square brackets and transforms/edges as arrows)
   
   ```
   [in.From] ---input---> [postReify] ---gbk---> [gbkOut] ---output---> [out.To]
   ```
   
   Where `input` and `output` are the newly added Reshuffle transforms. That 
looks right to me, and rereading the code it looks consistent with that.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 409086)
    Time Spent: 3h 10m  (was: 3h)

> Add a Reshuffle PTransform preventing fusion of the surrounding transforms
> --------------------------------------------------------------------------
>
>                 Key: BEAM-8292
>                 URL: https://issues.apache.org/jira/browse/BEAM-8292
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-go
>            Reporter: John Patoch
>            Assignee: Robert Burke
>            Priority: Minor
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Reshuffle is a PTransform that takes a PCollection<A> and shuffles the data 
> to help increase parallelism.
> Reshuffle adds a temporary random key to each element, performs a
>  GroupByKey, and finally removes the temporary key.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to