youngoli commented on a change in pull request #13325:
URL: https://github.com/apache/beam/pull/13325#discussion_r523383268



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -16,11 +16,222 @@
 package graphx
 
 import (
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
+// MergeExpandedWithPipeline adds expanded components of all 
ExternalTransforms to the existing pipeline
+func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {

Review comment:
       Done.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -16,11 +16,222 @@
 package graphx
 
 import (
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
+// MergeExpandedWithPipeline adds expanded components of all 
ExternalTransforms to the existing pipeline
+func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+       // Adding Expanded transforms to their counterparts in the Pipeline
+
+       for _, e := range edges {
+               if e.Op == graph.External {
+                       exp := e.External.Expanded
+                       if exp == nil {
+                               continue
+                       }
+                       id := fmt.Sprintf("e%v", e.ID())
+
+                       p.Requirements = append(p.Requirements, 
exp.Requirements...)
+
+                       // Adding components of the Expanded Transforms to the 
current Pipeline
+                       components, err := ExpandedComponents(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       for k, v := range components.GetTransforms() {
+                               p.Components.Transforms[k] = v
+                       }
+                       for k, v := range components.GetPcollections() {
+                               p.Components.Pcollections[k] = v
+                       }
+                       for k, v := range components.GetWindowingStrategies() {
+                               p.Components.WindowingStrategies[k] = v
+                       }
+                       for k, v := range components.GetCoders() {
+                               p.Components.Coders[k] = v
+                       }
+                       for k, v := range components.GetEnvironments() {
+                               if k == "go" {
+                                       // This case is not an anomaly. It is 
expected to be always
+                                       // present. Any initial 
ExpansionRequest will have a
+                                       // component which requires the "go" 
environment. Scoping
+                                       // using unique namespace prevents 
collision.
+                                       continue
+                               }
+                               p.Components.Environments[k] = v
+                       }
+
+                       transform, err := ExpandedTransform(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       p.Components.Transforms[id] = transform
+               }
+       }
+}
+
+// PurgeOutputInput remaps outputs from edge corresponding to an
+// ExternalTransform with the correct expanded outputs. All consumers of the
+// previous outputs are updated with new inputs.
+func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+       idxMap := make(map[string]string)
+       components := p.GetComponents()
+
+       // Generating map (oldID -> newID) of outputs to be purged
+       for _, e := range edges {
+               if e.Op == graph.External {
+                       if e.External.Expanded == nil {
+                               continue
+                       }
+                       for tag, n := range ExternalOutputs(e) {
+                               nodeID := fmt.Sprintf("n%v", n.ID())
+
+                               transform, err := 
ExpandedTransform(e.External.Expanded)
+                               if err != nil {
+                                       panic(err)
+                               }
+                               expandedOutputs := transform.GetOutputs()
+                               var pcolID string
+                               if tag == graph.UnnamedOutputTag {
+                                       for _, pcolID = range expandedOutputs {
+                                               // easiest way to access map 
with one entry (key,value)
+                                       }
+                               } else {
+                                       pcolID = expandedOutputs[tag]
+                               }
+
+                               idxMap[nodeID] = pcolID
+                               delete(components.Pcollections, nodeID)
+                       }
+               }
+       }
+
+       // Updating all input ids to reflect the correct sources
+       for _, t := range components.GetTransforms() {
+               inputs := t.GetInputs()
+               for tag, nodeID := range inputs {
+                       if pcolID, exists := idxMap[nodeID]; exists {
+                               inputs[tag] = pcolID
+                       }
+               }
+       }
+
+}
+
+// VerifyNamedOutputs ensures the expanded outputs correspond to the correct 
and expected named outputs
+func VerifyNamedOutputs(ext *graph.ExternalTransform) {
+       transform, err := ExpandedTransform(ext.Expanded)
+       if err != nil {
+               panic(err)
+       }
+       expandedOutputs := transform.GetOutputs()
+
+       if len(expandedOutputs) != len(ext.OutputsMap) {
+               panic(errors.Errorf("mismatched number of named 
outputs:\nreceived - %v\nexpected - %v", len(expandedOutputs), 
len(ext.OutputsMap)))
+       }
+
+       for tag := range ext.OutputsMap {
+               _, exists := expandedOutputs[tag]
+               if tag != graph.UnnamedOutputTag && !exists {
+                       panic(errors.Errorf("missing named output in expanded 
transform: %v is expected in %v", tag, expandedOutputs))
+               }
+               if tag == graph.UnnamedOutputTag && len(expandedOutputs) > 1 {
+                       panic(errors.Errorf("mismatched number of unnamed 
outputs:\nreceived - %v\nexpected - 1", len(expandedOutputs)))
+               }
+       }
+}
+
+// ResolveOutputIsBounded updates each Output node with respect to the received
+// expanded components to reflect if it is bounded or not
+func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater 
func(*graph.Node, bool)) {
+       ext := e.External
+       exp := ext.Expanded
+       components, err := ExpandedComponents(exp)
+       if err != nil {
+               panic(err)
+       }
+       expandedPCollections := components.GetPcollections()
+       transform, err := ExpandedTransform(exp)
+       if err != nil {
+               panic(err)
+       }
+       expandedOutputs := transform.GetOutputs()
+
+       for tag, node := range ExternalOutputs(e) {
+               var id string
+               isBounded := true
+
+               switch tag {
+               case graph.UnnamedOutputTag:
+                       for _, id = range expandedOutputs {
+                               // easiest way to access map with one entry 
(key,value)
+                       }
+               default:
+                       id = expandedOutputs[tag]
+               }
+
+               if pcol, exists := expandedPCollections[id]; exists {
+                       if pcol.GetIsBounded() == pipepb.IsBounded_UNBOUNDED {
+                               isBounded = false
+                       }
+                       isBoundedUpdater(node, isBounded)
+               } else {
+                       panic(errors.Errorf("missing corresponsing pcollection 
of named output: %v is expected in %v", id, expandedPCollections))
+               }
+
+       }
+}
+
+// AddFakeImpulses adds an impulse transform as the producer for each input to
+// the root transform. Inputs need producers to form a correct pipeline.
+func AddFakeImpulses(p *pipepb.Pipeline) {
+       // For a pipeline consisting of only the external node edge, there will 
be
+       // single root transform which will be the external transform.
+       // Adding fake impulses per input to this external transform
+       transforms := p.GetComponents().GetTransforms()
+       ext := transforms[p.GetRootTransformIds()[0]]
+
+       for tag, id := range ext.GetInputs() {
+               key := fmt.Sprintf("%s_%s", "impulse", tag)
+               output := map[string]string{"out": id}
+
+               impulse := &pipepb.PTransform{
+                       UniqueName: key,
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: URNImpulse,
+                       },
+                       Outputs: output,
+               }
+
+               transforms[key] = impulse
+       }
+
+}
+
+// RemoveFakeImpulses removes each fake impulse per input to the the transform.
+// Multiple producers for one Input cannot be present.
+func RemoveFakeImpulses(c *pipepb.Components, ext *pipepb.PTransform) {
+       transforms := c.GetTransforms()
+       var impulseIDs []string
+
+       for tag := range ext.GetInputs() {
+               id := fmt.Sprintf("%s_%s", "impulse", tag)
+               impulseIDs = append(impulseIDs, id)
+       }
+
+       for _, id := range impulseIDs {
+               t := transforms[id]
+               if t.GetSpec().GetUrn() == URNImpulse {
+                       delete(transforms, id)
+
+               }
+       }
+}
+
 // ExpandedComponents type asserts the Components field with interface{} type
 // and returns its pipeline component proto representation
 func ExpandedComponents(exp *graph.ExpandedTransform) (*pipepb.Components, 
error) {

Review comment:
       This one specifically is still used outside the package (in 
`xlangx.ResolveArtifacts`). The `ExternalOutputs` helper function is also used 
outside the package. And since some of them are used outside, I feel like all 
four of these helpers should be exported, just for consistency.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -16,11 +16,222 @@
 package graphx
 
 import (
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
+// MergeExpandedWithPipeline adds expanded components of all 
ExternalTransforms to the existing pipeline
+func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+       // Adding Expanded transforms to their counterparts in the Pipeline
+
+       for _, e := range edges {
+               if e.Op == graph.External {
+                       exp := e.External.Expanded
+                       if exp == nil {
+                               continue
+                       }
+                       id := fmt.Sprintf("e%v", e.ID())
+
+                       p.Requirements = append(p.Requirements, 
exp.Requirements...)
+
+                       // Adding components of the Expanded Transforms to the 
current Pipeline
+                       components, err := ExpandedComponents(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       for k, v := range components.GetTransforms() {
+                               p.Components.Transforms[k] = v
+                       }
+                       for k, v := range components.GetPcollections() {
+                               p.Components.Pcollections[k] = v
+                       }
+                       for k, v := range components.GetWindowingStrategies() {
+                               p.Components.WindowingStrategies[k] = v
+                       }
+                       for k, v := range components.GetCoders() {
+                               p.Components.Coders[k] = v
+                       }
+                       for k, v := range components.GetEnvironments() {
+                               if k == "go" {

Review comment:
       Makes sense. Done.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -16,11 +16,222 @@
 package graphx
 
 import (
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
+// MergeExpandedWithPipeline adds expanded components of all 
ExternalTransforms to the existing pipeline
+func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+       // Adding Expanded transforms to their counterparts in the Pipeline
+
+       for _, e := range edges {
+               if e.Op == graph.External {
+                       exp := e.External.Expanded
+                       if exp == nil {
+                               continue
+                       }
+                       id := fmt.Sprintf("e%v", e.ID())
+
+                       p.Requirements = append(p.Requirements, 
exp.Requirements...)
+
+                       // Adding components of the Expanded Transforms to the 
current Pipeline
+                       components, err := ExpandedComponents(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       for k, v := range components.GetTransforms() {
+                               p.Components.Transforms[k] = v
+                       }
+                       for k, v := range components.GetPcollections() {
+                               p.Components.Pcollections[k] = v
+                       }
+                       for k, v := range components.GetWindowingStrategies() {
+                               p.Components.WindowingStrategies[k] = v
+                       }
+                       for k, v := range components.GetCoders() {
+                               p.Components.Coders[k] = v
+                       }
+                       for k, v := range components.GetEnvironments() {
+                               if k == "go" {
+                                       // This case is not an anomaly. It is 
expected to be always
+                                       // present. Any initial 
ExpansionRequest will have a
+                                       // component which requires the "go" 
environment. Scoping
+                                       // using unique namespace prevents 
collision.
+                                       continue
+                               }
+                               p.Components.Environments[k] = v
+                       }
+
+                       transform, err := ExpandedTransform(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       p.Components.Transforms[id] = transform
+               }
+       }
+}
+
+// PurgeOutputInput remaps outputs from edge corresponding to an
+// ExternalTransform with the correct expanded outputs. All consumers of the
+// previous outputs are updated with new inputs.
+func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+       idxMap := make(map[string]string)
+       components := p.GetComponents()
+
+       // Generating map (oldID -> newID) of outputs to be purged
+       for _, e := range edges {
+               if e.Op == graph.External {
+                       if e.External.Expanded == nil {
+                               continue
+                       }
+                       for tag, n := range ExternalOutputs(e) {
+                               nodeID := fmt.Sprintf("n%v", n.ID())
+
+                               transform, err := 
ExpandedTransform(e.External.Expanded)
+                               if err != nil {
+                                       panic(err)
+                               }
+                               expandedOutputs := transform.GetOutputs()
+                               var pcolID string
+                               if tag == graph.UnnamedOutputTag {
+                                       for _, pcolID = range expandedOutputs {
+                                               // easiest way to access map 
with one entry (key,value)
+                                       }
+                               } else {
+                                       pcolID = expandedOutputs[tag]
+                               }
+
+                               idxMap[nodeID] = pcolID
+                               delete(components.Pcollections, nodeID)
+                       }
+               }
+       }
+
+       // Updating all input ids to reflect the correct sources
+       for _, t := range components.GetTransforms() {
+               inputs := t.GetInputs()
+               for tag, nodeID := range inputs {
+                       if pcolID, exists := idxMap[nodeID]; exists {
+                               inputs[tag] = pcolID
+                       }
+               }
+       }
+
+}
+
+// VerifyNamedOutputs ensures the expanded outputs correspond to the correct 
and expected named outputs
+func VerifyNamedOutputs(ext *graph.ExternalTransform) {
+       transform, err := ExpandedTransform(ext.Expanded)
+       if err != nil {
+               panic(err)
+       }
+       expandedOutputs := transform.GetOutputs()
+
+       if len(expandedOutputs) != len(ext.OutputsMap) {
+               panic(errors.Errorf("mismatched number of named 
outputs:\nreceived - %v\nexpected - %v", len(expandedOutputs), 
len(ext.OutputsMap)))
+       }
+
+       for tag := range ext.OutputsMap {
+               _, exists := expandedOutputs[tag]
+               if tag != graph.UnnamedOutputTag && !exists {
+                       panic(errors.Errorf("missing named output in expanded 
transform: %v is expected in %v", tag, expandedOutputs))
+               }
+               if tag == graph.UnnamedOutputTag && len(expandedOutputs) > 1 {
+                       panic(errors.Errorf("mismatched number of unnamed 
outputs:\nreceived - %v\nexpected - 1", len(expandedOutputs)))
+               }
+       }
+}
+
+// ResolveOutputIsBounded updates each Output node with respect to the received
+// expanded components to reflect if it is bounded or not
+func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater 
func(*graph.Node, bool)) {
+       ext := e.External
+       exp := ext.Expanded
+       components, err := ExpandedComponents(exp)
+       if err != nil {
+               panic(err)
+       }
+       expandedPCollections := components.GetPcollections()
+       transform, err := ExpandedTransform(exp)
+       if err != nil {
+               panic(err)
+       }
+       expandedOutputs := transform.GetOutputs()
+
+       for tag, node := range ExternalOutputs(e) {
+               var id string
+               isBounded := true
+
+               switch tag {
+               case graph.UnnamedOutputTag:
+                       for _, id = range expandedOutputs {
+                               // easiest way to access map with one entry 
(key,value)
+                       }
+               default:
+                       id = expandedOutputs[tag]
+               }
+
+               if pcol, exists := expandedPCollections[id]; exists {
+                       if pcol.GetIsBounded() == pipepb.IsBounded_UNBOUNDED {
+                               isBounded = false
+                       }
+                       isBoundedUpdater(node, isBounded)
+               } else {
+                       panic(errors.Errorf("missing corresponsing pcollection 
of named output: %v is expected in %v", id, expandedPCollections))
+               }
+
+       }
+}
+
+// AddFakeImpulses adds an impulse transform as the producer for each input to
+// the root transform. Inputs need producers to form a correct pipeline.
+func AddFakeImpulses(p *pipepb.Pipeline) {
+       // For a pipeline consisting of only the external node edge, there will 
be
+       // single root transform which will be the external transform.
+       // Adding fake impulses per input to this external transform
+       transforms := p.GetComponents().GetTransforms()
+       ext := transforms[p.GetRootTransformIds()[0]]
+
+       for tag, id := range ext.GetInputs() {
+               key := fmt.Sprintf("%s_%s", "impulse", tag)
+               output := map[string]string{"out": id}
+
+               impulse := &pipepb.PTransform{
+                       UniqueName: key,
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: URNImpulse,
+                       },
+                       Outputs: output,
+               }
+
+               transforms[key] = impulse
+       }
+
+}
+
+// RemoveFakeImpulses removes each fake impulse per input to the the transform.
+// Multiple producers for one Input cannot be present.
+func RemoveFakeImpulses(c *pipepb.Components, ext *pipepb.PTransform) {
+       transforms := c.GetTransforms()
+       var impulseIDs []string
+
+       for tag := range ext.GetInputs() {
+               id := fmt.Sprintf("%s_%s", "impulse", tag)
+               impulseIDs = append(impulseIDs, id)
+       }
+
+       for _, id := range impulseIDs {
+               t := transforms[id]
+               if t.GetSpec().GetUrn() == URNImpulse {
+                       delete(transforms, id)
+

Review comment:
       Done.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expand.go
##########
@@ -18,19 +18,79 @@ package xlangx
 import (
        "context"
 
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "google.golang.org/grpc"
 )
 
-// Expand submits an external transform to be expanded by the expansion 
service.
-// The given transform should be the external transform, and the components are
-// any additional components necessary for the pipeline snippet.
+// Expand expands an unexpanded graph.ExternalTransform and returns the 
expanded
+// transform as a new graph.ExpandedTransform. This requires querying an
+// expansion service based on the configuration details within the
+// ExternalTransform.
+func Expand(edge *graph.MultiEdge, ext *graph.ExternalTransform) 
(*graph.ExpandedTransform, error) {
+       // Build the ExpansionRequest
+
+       // Obtaining the components and transform proto representing this 
transform
+       p, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+       if err != nil {
+               return nil, errors.Wrapf(err, "unable to generate proto 
representation of %v", ext)
+       }
+
+       transforms := p.GetComponents().GetTransforms()
+
+       // Transforms consist of only External transform and composites. 
Composites
+       // should be removed from proto before submitting expansion request.
+       extTransformID := p.GetRootTransformIds()[0]
+       extTransform := transforms[extTransformID]
+       for extTransform.UniqueName != "External" {
+               delete(transforms, extTransformID)
+               p, err := pipelinex.Normalize(p)
+               if err != nil {
+                       return nil, err
+               }
+               extTransformID = p.GetRootTransformIds()[0]
+               extTransform = transforms[extTransformID]
+       }
+
+       // Scoping the ExternalTransform with respect to it's unique namespace, 
thus
+       // avoiding future collisions
+       AddNamespace(extTransform, p.GetComponents(), ext.Namespace)
+
+       graphx.AddFakeImpulses(p) // Inputs need to have sources
+       delete(transforms, extTransformID)
+
+       // Querying the expansion service
+       res, err := QueryExpansionService(context.Background(), 
p.GetComponents(), extTransform, ext.Namespace, ext.ExpansionAddr)
+       if err != nil {
+               return nil, err
+       }
+
+       // Handling ExpansionResponse
+
+       // Previously added fake impulses need to be removed to avoid having
+       // multiple sources to the same pcollection in the graph
+       graphx.RemoveFakeImpulses(res.GetComponents(), res.GetTransform())
+
+       exp := &graph.ExpandedTransform{
+               Components:   res.GetComponents(),
+               Transform:    res.GetTransform(),
+               Requirements: res.GetRequirements(),
+       }
+       return exp, nil
+}
+
+// QueryExpansionService submits an external transform to be expanded by the
+// expansion service. The given transform should be the external transform, and
+// the components are any additional components necessary for the pipeline
+// snippet.
 //
 // Users should generally call beam.CrossLanguage to access foreign transforms
 // rather than calling this function directly.
-func Expand(
+func QueryExpansionService(

Review comment:
       Yep, good point. I don't see it being necessary outside the package. 
Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to