lostluck commented on a change in pull request #14397:
URL: https://github.com/apache/beam/pull/14397#discussion_r606357992



##########
File path: sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
##########
@@ -243,3 +245,33 @@ func findFreeName(seen map[string]bool, name string) 
string {
                }
        }
 }
+
+// ApplySdkImageOverrides takes a pipeline and a map of patterns to overrides,
+// and proceeds to replace matching ContainerImages in any Environments
+// present in the pipeline.
+func ApplySdkImageOverrides(p *pipepb.Pipeline, patterns map[string]string) 
error {
+       if len(patterns) == 0 {
+               return nil
+       }
+
+       for _, env := range p.GetComponents().GetEnvironments() {
+               var payload pipepb.DockerPayload
+               if err := proto.Unmarshal(env.GetPayload(), &payload); err != 
nil {
+                       return err
+               }
+               oldImg := payload.GetContainerImage()
+               for pattern, replacement := range patterns {
+                       re, err := regexp.Compile(pattern)

Review comment:
       No action required, just a call out to be wary of nested for loops and 
repeated work.
   
   I think my main concern here is that we're compiling the patterns multiple 
times (each once per environment at most), which is probably not going to be 
terrible since this is at construction time, and both patterns and environments 
are going to be limited.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/namespace.go
##########
@@ -81,36 +76,18 @@ func addWindowingStrategyID(c *pipepb.Components, idMap 
map[string]string, wid s
        return idMap[wid]
 }
 
-func addEnvironmentID(c *pipepb.Components, idMap map[string]string, eid 
string, newID func(string) string) string {
-       if _, exists := idMap[eid]; exists {
-               return idMap[eid]
-       }
-
-       environment, exists := c.Environments[eid]
-       if !exists {
-               panic(errors.Errorf("attempted to add namespace to missing 
windowing strategy id: %v not in %v", eid, c.Environments))
-       }
-
-       idMap[eid] = newID(eid)
-
-       // Updating Environments map
-       c.Environments[idMap[eid]] = environment
-       delete(c.Environments, eid)
-
-       return idMap[eid]
-}
-
 func addNamespace(t *pipepb.PTransform, c *pipepb.Components, namespace 
string) {
        newID := func(id string) string {
                return fmt.Sprintf("%v@%v", id, namespace)
        }
 
        idMap := make(map[string]string)
 
-       // Update Environment ID of PTransform
-       if t.EnvironmentId != "" {
-               t.EnvironmentId = addEnvironmentID(c, idMap, t.EnvironmentId, 
newID)
-       }
+       // TODO: Currently environments are not namespaced. This works under the

Review comment:
       Consider including the JIRA tag for history context, or instead of TODO 
(which implies work that should likely be done) use Note: or avoid a prefix 
entirely, since all comments are notes to our future selves.
   
   As for the content, it seems probable that we'd make sure the Go 
ExpansionService that handles these calls would do the namespacing on the 
response itself. That way it can build the pipeline graph normally, and we 
simply run the replacement there, where we are already certain that it's a 
foreign component, rather than the primary pipeline.

##########
File path: sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
##########
@@ -243,3 +245,33 @@ func findFreeName(seen map[string]bool, name string) 
string {
                }
        }
 }
+
+// ApplySdkImageOverrides takes a pipeline and a map of patterns to overrides,
+// and proceeds to replace matching ContainerImages in any Environments
+// present in the pipeline.
+func ApplySdkImageOverrides(p *pipepb.Pipeline, patterns map[string]string) 
error {

Review comment:
       Consider documenting the expectation that any given environment is 
expected to match only a single pattern for replacement, and that if multiple 
patterns would match, it's arbitrary which will be applied (due to map 
iteration ordering being random.)
   
   There's no good way to handle such conflict cases for multple matches. I 
suspect the most we can do is say it's undefined in the flag itself, as well as 
the commentary change in the previous paragraph.

##########
File path: sdks/go/pkg/beam/options/jobopts/options.go
##########
@@ -31,6 +31,18 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
+func init() {
+       flag.Var(&SdkHarnessContainerImageOverrides,
+               "sdk_harness_container_image_override",
+               "Overrides for SDK harness container images. Could be for the "+
+                       "local SDK or for a remote SDK that pipeline has to 
support due "+
+                       "to a cross-language transform. Each entry consist of 
two values "+
+                       "separated by a comma where first value gives a regex 
to "+
+                       "identify the container image to override and the 
second value "+
+                       "gives the replacement container image. Multiple 
entries can be "+
+                       "specified by using this flag multiple times.")

Review comment:
       Just confirming, this part matches the semantics of the equivalent flag 
in the other SDKs? 

##########
File path: sdks/go/pkg/beam/options/jobopts/options.go
##########
@@ -31,6 +31,18 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
+func init() {
+       flag.Var(&SdkHarnessContainerImageOverrides,
+               "sdk_harness_container_image_override",
+               "Overrides for SDK harness container images. Could be for the "+
+                       "local SDK or for a remote SDK that pipeline has to 
support due "+
+                       "to a cross-language transform. Each entry consist of 
two values "+

Review comment:
       ```suggestion
                        "to a cross-language transform. Each entry consists of 
two values "+
   ```

##########
File path: sdks/go/pkg/beam/runners/dataflow/dataflow.go
##########
@@ -181,11 +183,15 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error)
        if err != nil {
                return nil, err
        }
-       enviroment, err := graphx.CreateEnvironment(ctx, 
jobopts.GetEnvironmentUrn(ctx), getContainerImage)
+       environment, err := graphx.CreateEnvironment(ctx, 
jobopts.GetEnvironmentUrn(ctx), getContainerImage)
        if err != nil {
                return nil, errors.WithContext(err, "generating model pipeline")
        }
-       model, err := graphx.Marshal(edges, &graphx.Options{Environment: 
enviroment})
+       model, err := graphx.Marshal(edges, &graphx.Options{Environment: 
environment})
+       if err != nil {
+               return nil, errors.WithContext(err, "generating model pipeline")
+       }
+       err = pipelinex.ApplySdkImageOverrides(model, 
jobopts.GetSdkImageOverrides())
        if err != nil {
                return nil, errors.WithContext(err, "generating model pipeline")

Review comment:
       ```suggestion
                return nil, errors.WithContext(err, "applying container image 
overrides")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to