This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e31e8855ad9 [#27839][Go SDK] Write pipeline options to a file, instead 
reading from a flag. (#31482)
e31e8855ad9 is described below

commit e31e8855ad9a7767c79700e7cd5ea31a419a7997
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Tue Jun 4 05:17:22 2024 -0700

    [#27839][Go SDK] Write pipeline options to a file, instead reading from a 
flag. (#31482)
    
    * [#27839] Move pipeline options file creation to tools package.
    
    * Write options to a file in the container instead of burdening the command 
line.
    
    ---------
    
    Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com>
---
 sdks/go/container/boot.go                          |  4 ++-
 sdks/go/container/tools/pipeline_options.go        | 39 ++++++++++++++++++++++
 sdks/go/pkg/beam/core/runtime/harness/init/init.go | 17 +++++++++-
 sdks/java/container/boot.go                        | 24 +++----------
 4 files changed, 62 insertions(+), 22 deletions(-)

diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index 1a5e154ace7..15f9ecc101c 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -137,7 +137,9 @@ func main() {
                "--logging_endpoint=" + *loggingEndpoint,
                "--control_endpoint=" + *controlEndpoint,
                "--semi_persist_dir=" + *semiPersistDir,
-               "--options=" + options,
+       }
+       if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
+               logger.Fatalf(ctx, "Failed to load pipeline options to worker: 
%v", err)
        }
        if info.GetStatusEndpoint() != nil {
                os.Setenv("STATUS_ENDPOINT", info.GetStatusEndpoint().GetUrl())
diff --git a/sdks/go/container/tools/pipeline_options.go 
b/sdks/go/container/tools/pipeline_options.go
new file mode 100644
index 00000000000..7b46d8fa8c8
--- /dev/null
+++ b/sdks/go/container/tools/pipeline_options.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tools
+
+import (
+       "fmt"
+       "os"
+)
+
+// MakePipelineOptionsFileAndEnvVar writes the pipeline options to a file.
+// Assumes the options string is JSON formatted.
+//
+// Stores the file name in question in PIPELINE_OPTIONS_FILE for access by the 
SDK.
+func MakePipelineOptionsFileAndEnvVar(options string) error {
+       fn := "pipeline_options.json"
+       f, err := os.Create(fn)
+       if err != nil {
+               return fmt.Errorf("unable to create %v: %w", fn, err)
+       }
+       defer f.Close()
+       if _, err := f.WriteString(options); err != nil {
+               return fmt.Errorf("error writing %v: %w", f.Name(), err)
+       }
+       os.Setenv("PIPELINE_OPTIONS_FILE", f.Name())
+       return nil
+}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/init/init.go 
b/sdks/go/pkg/beam/core/runtime/harness/init/init.go
index 468708b2917..8a5b45fea5e 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/init/init.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/init/init.go
@@ -51,7 +51,7 @@ var (
        controlEndpoint = flag.String("control_endpoint", "", "Local control 
gRPC endpoint (required in worker mode).")
        //lint:ignore U1000 semiPersistDir flag is passed in through the boot 
container, will need to be removed later
        semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local 
semi-persistent directory (optional in worker mode).")
-       options        = flag.String("options", "", "JSON-encoded pipeline 
options (required in worker mode).")
+       options        = flag.String("options", "", "JSON-encoded pipeline 
options (required in worker mode). (deprecated)")
 )
 
 type exitMode int
@@ -93,6 +93,21 @@ func hook() {
        // will be captured by the framework -- which may not be functional if
        // harness.Main returns. We want to be sure any error makes it out.
 
+       pipelineOptionsFilename := os.Getenv("PIPELINE_OPTIONS_FILE")
+       if pipelineOptionsFilename != "" {
+               if *options != "" {
+                       fmt.Fprintf(os.Stderr, "WARNING: env variable 
PIPELINE_OPTIONS_FILE set but options flag populated. Potentially bad container 
loader. Flag value before overwrite: %v\n", options)
+               }
+               contents, err := os.ReadFile(pipelineOptionsFilename)
+               if err != nil {
+                       fmt.Fprintf(os.Stderr, "Failed to read pipeline options 
file '%v': %v\n", pipelineOptionsFilename, err)
+                       os.Exit(1)
+               }
+               // Overwite flag to be consistent with the legacy flag 
processing.
+               *options = string(contents)
+       }
+       // Load in pipeline options from the flag string. Used for both the new 
options file path
+       // and the older flag approach.
        if *options != "" {
                var opt runtime.RawOptionsWrapper
                if err := json.Unmarshal([]byte(*options), &opt); err != nil {
diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go
index f7fd7437c88..ceda3d2be66 100644
--- a/sdks/java/container/boot.go
+++ b/sdks/java/container/boot.go
@@ -124,7 +124,7 @@ func main() {
        // (3) Invoke the Java harness, preserving artifact ordering in 
classpath.
 
        os.Setenv("HARNESS_ID", *id)
-       if err := makePipelineOptionsFile(options); err != nil {
+       if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
                logger.Fatalf(ctx, "Failed to load pipeline options to worker: 
%v", err)
        }
        os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", 
proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}))
@@ -247,29 +247,13 @@ func main() {
        logger.Fatalf(ctx, "Java exited: %v", execx.Execute("java", args...))
 }
 
-// makePipelineOptionsFile writes the pipeline options to a file.
-// Assumes the options string is JSON formatted.
-func makePipelineOptionsFile(options string) error {
-       fn := "pipeline_options.json"
-       f, err := os.Create(fn)
-       if err != nil {
-               return fmt.Errorf("unable to create %v: %w", fn, err)
-       }
-       defer f.Close()
-       if _, err := f.WriteString(options); err != nil {
-               return fmt.Errorf("error writing %v: %w", f.Name(), err)
-       }
-       os.Setenv("PIPELINE_OPTIONS_FILE", f.Name())
-       return nil
-}
-
 // heapSizeLimit returns 80% of the runner limit, if provided. If not provided,
 // it returns 70% of the physical memory on the machine. If it cannot determine
 // that value, it returns 1GB. This is an imperfect heuristic. It aims to
 // ensure there is memory for non-heap use and other overhead, while also not
-// underutilizing the machine. if set_recommended_max_xmx experiment is 
enabled, 
-// sets xmx to 32G. Under 32G JVM enables CompressedOops. CompressedOops 
-// utilizes memory more efficiently, and has positive impact on GC performance 
+// underutilizing the machine. if set_recommended_max_xmx experiment is 
enabled,
+// sets xmx to 32G. Under 32G JVM enables CompressedOops. CompressedOops
+// utilizes memory more efficiently, and has positive impact on GC performance
 // and cache hit rate.
 func heapSizeLimit(info *fnpb.ProvisionInfo, setRecommendedMaxXmx bool) uint64 
{
        if setRecommendedMaxXmx {

Reply via email to