youngoli commented on a change in pull request #13325: URL: https://github.com/apache/beam/pull/13325#discussion_r523383268
########## File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go ########## @@ -16,11 +16,222 @@ package graphx import ( + "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" ) +// MergeExpandedWithPipeline adds expanded components of all ExternalTransforms to the existing pipeline +func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) { Review comment: Done. ########## File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go ########## @@ -16,11 +16,222 @@ package graphx import ( + "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" ) +// MergeExpandedWithPipeline adds expanded components of all ExternalTransforms to the existing pipeline +func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) { + // Adding Expanded transforms to their counterparts in the Pipeline + + for _, e := range edges { + if e.Op == graph.External { + exp := e.External.Expanded + if exp == nil { + continue + } + id := fmt.Sprintf("e%v", e.ID()) + + p.Requirements = append(p.Requirements, exp.Requirements...) + + // Adding components of the Expanded Transforms to the current Pipeline + components, err := ExpandedComponents(exp) + if err != nil { + panic(err) + } + for k, v := range components.GetTransforms() { + p.Components.Transforms[k] = v + } + for k, v := range components.GetPcollections() { + p.Components.Pcollections[k] = v + } + for k, v := range components.GetWindowingStrategies() { + p.Components.WindowingStrategies[k] = v + } + for k, v := range components.GetCoders() { + p.Components.Coders[k] = v + } + for k, v := range components.GetEnvironments() { + if k == "go" { + // This case is not an anomaly. It is expected to be always + // present. Any initial ExpansionRequest will have a + // component which requires the "go" environment. Scoping + // using unique namespace prevents collision. + continue + } + p.Components.Environments[k] = v + } + + transform, err := ExpandedTransform(exp) + if err != nil { + panic(err) + } + p.Components.Transforms[id] = transform + } + } +} + +// PurgeOutputInput remaps outputs from edge corresponding to an +// ExternalTransform with the correct expanded outputs. All consumers of the +// previous outputs are updated with new inputs. +func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) { + idxMap := make(map[string]string) + components := p.GetComponents() + + // Generating map (oldID -> newID) of outputs to be purged + for _, e := range edges { + if e.Op == graph.External { + if e.External.Expanded == nil { + continue + } + for tag, n := range ExternalOutputs(e) { + nodeID := fmt.Sprintf("n%v", n.ID()) + + transform, err := ExpandedTransform(e.External.Expanded) + if err != nil { + panic(err) + } + expandedOutputs := transform.GetOutputs() + var pcolID string + if tag == graph.UnnamedOutputTag { + for _, pcolID = range expandedOutputs { + // easiest way to access map with one entry (key,value) + } + } else { + pcolID = expandedOutputs[tag] + } + + idxMap[nodeID] = pcolID + delete(components.Pcollections, nodeID) + } + } + } + + // Updating all input ids to reflect the correct sources + for _, t := range components.GetTransforms() { + inputs := t.GetInputs() + for tag, nodeID := range inputs { + if pcolID, exists := idxMap[nodeID]; exists { + inputs[tag] = pcolID + } + } + } + +} + +// VerifyNamedOutputs ensures the expanded outputs correspond to the correct and expected named outputs +func VerifyNamedOutputs(ext *graph.ExternalTransform) { + transform, err := ExpandedTransform(ext.Expanded) + if err != nil { + panic(err) + } + expandedOutputs := transform.GetOutputs() + + if len(expandedOutputs) != len(ext.OutputsMap) { + panic(errors.Errorf("mismatched number of named outputs:\nreceived - %v\nexpected - %v", len(expandedOutputs), len(ext.OutputsMap))) + } + + for tag := range ext.OutputsMap { + _, exists := expandedOutputs[tag] + if tag != graph.UnnamedOutputTag && !exists { + panic(errors.Errorf("missing named output in expanded transform: %v is expected in %v", tag, expandedOutputs)) + } + if tag == graph.UnnamedOutputTag && len(expandedOutputs) > 1 { + panic(errors.Errorf("mismatched number of unnamed outputs:\nreceived - %v\nexpected - 1", len(expandedOutputs))) + } + } +} + +// ResolveOutputIsBounded updates each Output node with respect to the received +// expanded components to reflect if it is bounded or not +func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater func(*graph.Node, bool)) { + ext := e.External + exp := ext.Expanded + components, err := ExpandedComponents(exp) + if err != nil { + panic(err) + } + expandedPCollections := components.GetPcollections() + transform, err := ExpandedTransform(exp) + if err != nil { + panic(err) + } + expandedOutputs := transform.GetOutputs() + + for tag, node := range ExternalOutputs(e) { + var id string + isBounded := true + + switch tag { + case graph.UnnamedOutputTag: + for _, id = range expandedOutputs { + // easiest way to access map with one entry (key,value) + } + default: + id = expandedOutputs[tag] + } + + if pcol, exists := expandedPCollections[id]; exists { + if pcol.GetIsBounded() == pipepb.IsBounded_UNBOUNDED { + isBounded = false + } + isBoundedUpdater(node, isBounded) + } else { + panic(errors.Errorf("missing corresponsing pcollection of named output: %v is expected in %v", id, expandedPCollections)) + } + + } +} + +// AddFakeImpulses adds an impulse transform as the producer for each input to +// the root transform. Inputs need producers to form a correct pipeline. +func AddFakeImpulses(p *pipepb.Pipeline) { + // For a pipeline consisting of only the external node edge, there will be + // single root transform which will be the external transform. + // Adding fake impulses per input to this external transform + transforms := p.GetComponents().GetTransforms() + ext := transforms[p.GetRootTransformIds()[0]] + + for tag, id := range ext.GetInputs() { + key := fmt.Sprintf("%s_%s", "impulse", tag) + output := map[string]string{"out": id} + + impulse := &pipepb.PTransform{ + UniqueName: key, + Spec: &pipepb.FunctionSpec{ + Urn: URNImpulse, + }, + Outputs: output, + } + + transforms[key] = impulse + } + +} + +// RemoveFakeImpulses removes each fake impulse per input to the the transform. +// Multiple producers for one Input cannot be present. +func RemoveFakeImpulses(c *pipepb.Components, ext *pipepb.PTransform) { + transforms := c.GetTransforms() + var impulseIDs []string + + for tag := range ext.GetInputs() { + id := fmt.Sprintf("%s_%s", "impulse", tag) + impulseIDs = append(impulseIDs, id) + } + + for _, id := range impulseIDs { + t := transforms[id] + if t.GetSpec().GetUrn() == URNImpulse { + delete(transforms, id) + + } + } +} + // ExpandedComponents type asserts the Components field with interface{} type // and returns its pipeline component proto representation func ExpandedComponents(exp *graph.ExpandedTransform) (*pipepb.Components, error) { Review comment: This one specifically is still used outside the package (in `xlangx.ResolveArtifacts`). The `ExternalOutputs` helper function is also used outside the package. And since some of them are used outside, I feel like all four of these helpers should be exported, just for consistency. ########## File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go ########## @@ -16,11 +16,222 @@ package graphx import ( + "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" ) +// MergeExpandedWithPipeline adds expanded components of all ExternalTransforms to the existing pipeline +func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) { + // Adding Expanded transforms to their counterparts in the Pipeline + + for _, e := range edges { + if e.Op == graph.External { + exp := e.External.Expanded + if exp == nil { + continue + } + id := fmt.Sprintf("e%v", e.ID()) + + p.Requirements = append(p.Requirements, exp.Requirements...) + + // Adding components of the Expanded Transforms to the current Pipeline + components, err := ExpandedComponents(exp) + if err != nil { + panic(err) + } + for k, v := range components.GetTransforms() { + p.Components.Transforms[k] = v + } + for k, v := range components.GetPcollections() { + p.Components.Pcollections[k] = v + } + for k, v := range components.GetWindowingStrategies() { + p.Components.WindowingStrategies[k] = v + } + for k, v := range components.GetCoders() { + p.Components.Coders[k] = v + } + for k, v := range components.GetEnvironments() { + if k == "go" { Review comment: Makes sense. Done. ########## File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go ########## @@ -16,11 +16,222 @@ package graphx import ( + "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" ) +// MergeExpandedWithPipeline adds expanded components of all ExternalTransforms to the existing pipeline +func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) { + // Adding Expanded transforms to their counterparts in the Pipeline + + for _, e := range edges { + if e.Op == graph.External { + exp := e.External.Expanded + if exp == nil { + continue + } + id := fmt.Sprintf("e%v", e.ID()) + + p.Requirements = append(p.Requirements, exp.Requirements...) + + // Adding components of the Expanded Transforms to the current Pipeline + components, err := ExpandedComponents(exp) + if err != nil { + panic(err) + } + for k, v := range components.GetTransforms() { + p.Components.Transforms[k] = v + } + for k, v := range components.GetPcollections() { + p.Components.Pcollections[k] = v + } + for k, v := range components.GetWindowingStrategies() { + p.Components.WindowingStrategies[k] = v + } + for k, v := range components.GetCoders() { + p.Components.Coders[k] = v + } + for k, v := range components.GetEnvironments() { + if k == "go" { + // This case is not an anomaly. It is expected to be always + // present. Any initial ExpansionRequest will have a + // component which requires the "go" environment. Scoping + // using unique namespace prevents collision. + continue + } + p.Components.Environments[k] = v + } + + transform, err := ExpandedTransform(exp) + if err != nil { + panic(err) + } + p.Components.Transforms[id] = transform + } + } +} + +// PurgeOutputInput remaps outputs from edge corresponding to an +// ExternalTransform with the correct expanded outputs. All consumers of the +// previous outputs are updated with new inputs. +func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) { + idxMap := make(map[string]string) + components := p.GetComponents() + + // Generating map (oldID -> newID) of outputs to be purged + for _, e := range edges { + if e.Op == graph.External { + if e.External.Expanded == nil { + continue + } + for tag, n := range ExternalOutputs(e) { + nodeID := fmt.Sprintf("n%v", n.ID()) + + transform, err := ExpandedTransform(e.External.Expanded) + if err != nil { + panic(err) + } + expandedOutputs := transform.GetOutputs() + var pcolID string + if tag == graph.UnnamedOutputTag { + for _, pcolID = range expandedOutputs { + // easiest way to access map with one entry (key,value) + } + } else { + pcolID = expandedOutputs[tag] + } + + idxMap[nodeID] = pcolID + delete(components.Pcollections, nodeID) + } + } + } + + // Updating all input ids to reflect the correct sources + for _, t := range components.GetTransforms() { + inputs := t.GetInputs() + for tag, nodeID := range inputs { + if pcolID, exists := idxMap[nodeID]; exists { + inputs[tag] = pcolID + } + } + } + +} + +// VerifyNamedOutputs ensures the expanded outputs correspond to the correct and expected named outputs +func VerifyNamedOutputs(ext *graph.ExternalTransform) { + transform, err := ExpandedTransform(ext.Expanded) + if err != nil { + panic(err) + } + expandedOutputs := transform.GetOutputs() + + if len(expandedOutputs) != len(ext.OutputsMap) { + panic(errors.Errorf("mismatched number of named outputs:\nreceived - %v\nexpected - %v", len(expandedOutputs), len(ext.OutputsMap))) + } + + for tag := range ext.OutputsMap { + _, exists := expandedOutputs[tag] + if tag != graph.UnnamedOutputTag && !exists { + panic(errors.Errorf("missing named output in expanded transform: %v is expected in %v", tag, expandedOutputs)) + } + if tag == graph.UnnamedOutputTag && len(expandedOutputs) > 1 { + panic(errors.Errorf("mismatched number of unnamed outputs:\nreceived - %v\nexpected - 1", len(expandedOutputs))) + } + } +} + +// ResolveOutputIsBounded updates each Output node with respect to the received +// expanded components to reflect if it is bounded or not +func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater func(*graph.Node, bool)) { + ext := e.External + exp := ext.Expanded + components, err := ExpandedComponents(exp) + if err != nil { + panic(err) + } + expandedPCollections := components.GetPcollections() + transform, err := ExpandedTransform(exp) + if err != nil { + panic(err) + } + expandedOutputs := transform.GetOutputs() + + for tag, node := range ExternalOutputs(e) { + var id string + isBounded := true + + switch tag { + case graph.UnnamedOutputTag: + for _, id = range expandedOutputs { + // easiest way to access map with one entry (key,value) + } + default: + id = expandedOutputs[tag] + } + + if pcol, exists := expandedPCollections[id]; exists { + if pcol.GetIsBounded() == pipepb.IsBounded_UNBOUNDED { + isBounded = false + } + isBoundedUpdater(node, isBounded) + } else { + panic(errors.Errorf("missing corresponsing pcollection of named output: %v is expected in %v", id, expandedPCollections)) + } + + } +} + +// AddFakeImpulses adds an impulse transform as the producer for each input to +// the root transform. Inputs need producers to form a correct pipeline. +func AddFakeImpulses(p *pipepb.Pipeline) { + // For a pipeline consisting of only the external node edge, there will be + // single root transform which will be the external transform. + // Adding fake impulses per input to this external transform + transforms := p.GetComponents().GetTransforms() + ext := transforms[p.GetRootTransformIds()[0]] + + for tag, id := range ext.GetInputs() { + key := fmt.Sprintf("%s_%s", "impulse", tag) + output := map[string]string{"out": id} + + impulse := &pipepb.PTransform{ + UniqueName: key, + Spec: &pipepb.FunctionSpec{ + Urn: URNImpulse, + }, + Outputs: output, + } + + transforms[key] = impulse + } + +} + +// RemoveFakeImpulses removes each fake impulse per input to the the transform. +// Multiple producers for one Input cannot be present. +func RemoveFakeImpulses(c *pipepb.Components, ext *pipepb.PTransform) { + transforms := c.GetTransforms() + var impulseIDs []string + + for tag := range ext.GetInputs() { + id := fmt.Sprintf("%s_%s", "impulse", tag) + impulseIDs = append(impulseIDs, id) + } + + for _, id := range impulseIDs { + t := transforms[id] + if t.GetSpec().GetUrn() == URNImpulse { + delete(transforms, id) + Review comment: Done. ########## File path: sdks/go/pkg/beam/core/runtime/xlangx/expand.go ########## @@ -18,19 +18,79 @@ package xlangx import ( "context" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1" pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" "google.golang.org/grpc" ) -// Expand submits an external transform to be expanded by the expansion service. -// The given transform should be the external transform, and the components are -// any additional components necessary for the pipeline snippet. +// Expand expands an unexpanded graph.ExternalTransform and returns the expanded +// transform as a new graph.ExpandedTransform. This requires querying an +// expansion service based on the configuration details within the +// ExternalTransform. +func Expand(edge *graph.MultiEdge, ext *graph.ExternalTransform) (*graph.ExpandedTransform, error) { + // Build the ExpansionRequest + + // Obtaining the components and transform proto representing this transform + p, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{}) + if err != nil { + return nil, errors.Wrapf(err, "unable to generate proto representation of %v", ext) + } + + transforms := p.GetComponents().GetTransforms() + + // Transforms consist of only External transform and composites. Composites + // should be removed from proto before submitting expansion request. + extTransformID := p.GetRootTransformIds()[0] + extTransform := transforms[extTransformID] + for extTransform.UniqueName != "External" { + delete(transforms, extTransformID) + p, err := pipelinex.Normalize(p) + if err != nil { + return nil, err + } + extTransformID = p.GetRootTransformIds()[0] + extTransform = transforms[extTransformID] + } + + // Scoping the ExternalTransform with respect to it's unique namespace, thus + // avoiding future collisions + AddNamespace(extTransform, p.GetComponents(), ext.Namespace) + + graphx.AddFakeImpulses(p) // Inputs need to have sources + delete(transforms, extTransformID) + + // Querying the expansion service + res, err := QueryExpansionService(context.Background(), p.GetComponents(), extTransform, ext.Namespace, ext.ExpansionAddr) + if err != nil { + return nil, err + } + + // Handling ExpansionResponse + + // Previously added fake impulses need to be removed to avoid having + // multiple sources to the same pcollection in the graph + graphx.RemoveFakeImpulses(res.GetComponents(), res.GetTransform()) + + exp := &graph.ExpandedTransform{ + Components: res.GetComponents(), + Transform: res.GetTransform(), + Requirements: res.GetRequirements(), + } + return exp, nil +} + +// QueryExpansionService submits an external transform to be expanded by the +// expansion service. The given transform should be the external transform, and +// the components are any additional components necessary for the pipeline +// snippet. // // Users should generally call beam.CrossLanguage to access foreign transforms // rather than calling this function directly. -func Expand( +func QueryExpansionService( Review comment: Yep, good point. I don't see it being necessary outside the package. Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org