lostluck commented on a change in pull request #14397:
URL: https://github.com/apache/beam/pull/14397#discussion_r606357992
##########
File path: sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
##########
@@ -243,3 +245,33 @@ func findFreeName(seen map[string]bool, name string)
string {
}
}
}
+
+// ApplySdkImageOverrides takes a pipeline and a map of patterns to overrides,
+// and proceeds to replace matching ContainerImages in any Environments
+// present in the pipeline.
+func ApplySdkImageOverrides(p *pipepb.Pipeline, patterns map[string]string)
error {
+ if len(patterns) == 0 {
+ return nil
+ }
+
+ for _, env := range p.GetComponents().GetEnvironments() {
+ var payload pipepb.DockerPayload
+ if err := proto.Unmarshal(env.GetPayload(), &payload); err !=
nil {
+ return err
+ }
+ oldImg := payload.GetContainerImage()
+ for pattern, replacement := range patterns {
+ re, err := regexp.Compile(pattern)
Review comment:
No action required, just a call out to be wary of nested for loops and
repeated work.
I think my main concern here is that we're compiling the patterns multiple
times (each once per environment at most), which is probably not going to be
terrible since this is at construction time, and both patterns and environments
are going to be limited.
##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/namespace.go
##########
@@ -81,36 +76,18 @@ func addWindowingStrategyID(c *pipepb.Components, idMap
map[string]string, wid s
return idMap[wid]
}
-func addEnvironmentID(c *pipepb.Components, idMap map[string]string, eid
string, newID func(string) string) string {
- if _, exists := idMap[eid]; exists {
- return idMap[eid]
- }
-
- environment, exists := c.Environments[eid]
- if !exists {
- panic(errors.Errorf("attempted to add namespace to missing
windowing strategy id: %v not in %v", eid, c.Environments))
- }
-
- idMap[eid] = newID(eid)
-
- // Updating Environments map
- c.Environments[idMap[eid]] = environment
- delete(c.Environments, eid)
-
- return idMap[eid]
-}
-
func addNamespace(t *pipepb.PTransform, c *pipepb.Components, namespace
string) {
newID := func(id string) string {
return fmt.Sprintf("%v@%v", id, namespace)
}
idMap := make(map[string]string)
- // Update Environment ID of PTransform
- if t.EnvironmentId != "" {
- t.EnvironmentId = addEnvironmentID(c, idMap, t.EnvironmentId,
newID)
- }
+ // TODO: Currently environments are not namespaced. This works under the
Review comment:
Consider including the JIRA tag for history context, or instead of TODO
(which implies work that should likely be done) use Note: or avoid a prefix
entirely, since all comments are notes to our future selves.
As for the content, it seems probable that we'd make sure the Go
ExpansionService that handles these calls would do the namespacing on the
response itself. That way it can build the pipeline graph normally, and we
simply run the replacement there, where we are already certain that it's a
foreign component, rather than the primary pipeline.
##########
File path: sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
##########
@@ -243,3 +245,33 @@ func findFreeName(seen map[string]bool, name string)
string {
}
}
}
+
+// ApplySdkImageOverrides takes a pipeline and a map of patterns to overrides,
+// and proceeds to replace matching ContainerImages in any Environments
+// present in the pipeline.
+func ApplySdkImageOverrides(p *pipepb.Pipeline, patterns map[string]string)
error {
Review comment:
Consider documenting the expectation that any given environment is
expected to match only a single pattern for replacement, and that if multiple
patterns would match, it's arbitrary which will be applied (due to map
iteration ordering being random.)
There's no good way to handle such conflict cases for multple matches. I
suspect the most we can do is say it's undefined in the flag itself, as well as
the commentary change in the previous paragraph.
##########
File path: sdks/go/pkg/beam/options/jobopts/options.go
##########
@@ -31,6 +31,18 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/log"
)
+func init() {
+ flag.Var(&SdkHarnessContainerImageOverrides,
+ "sdk_harness_container_image_override",
+ "Overrides for SDK harness container images. Could be for the "+
+ "local SDK or for a remote SDK that pipeline has to
support due "+
+ "to a cross-language transform. Each entry consist of
two values "+
+ "separated by a comma where first value gives a regex
to "+
+ "identify the container image to override and the
second value "+
+ "gives the replacement container image. Multiple
entries can be "+
+ "specified by using this flag multiple times.")
Review comment:
Just confirming, this part matches the semantics of the equivalent flag
in the other SDKs?
##########
File path: sdks/go/pkg/beam/options/jobopts/options.go
##########
@@ -31,6 +31,18 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/log"
)
+func init() {
+ flag.Var(&SdkHarnessContainerImageOverrides,
+ "sdk_harness_container_image_override",
+ "Overrides for SDK harness container images. Could be for the "+
+ "local SDK or for a remote SDK that pipeline has to
support due "+
+ "to a cross-language transform. Each entry consist of
two values "+
Review comment:
```suggestion
"to a cross-language transform. Each entry consists of
two values "+
```
##########
File path: sdks/go/pkg/beam/runners/dataflow/dataflow.go
##########
@@ -181,11 +183,15 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
if err != nil {
return nil, err
}
- enviroment, err := graphx.CreateEnvironment(ctx,
jobopts.GetEnvironmentUrn(ctx), getContainerImage)
+ environment, err := graphx.CreateEnvironment(ctx,
jobopts.GetEnvironmentUrn(ctx), getContainerImage)
if err != nil {
return nil, errors.WithContext(err, "generating model pipeline")
}
- model, err := graphx.Marshal(edges, &graphx.Options{Environment:
enviroment})
+ model, err := graphx.Marshal(edges, &graphx.Options{Environment:
environment})
+ if err != nil {
+ return nil, errors.WithContext(err, "generating model pipeline")
+ }
+ err = pipelinex.ApplySdkImageOverrides(model,
jobopts.GetSdkImageOverrides())
if err != nil {
return nil, errors.WithContext(err, "generating model pipeline")
Review comment:
```suggestion
return nil, errors.WithContext(err, "applying container image
overrides")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]