This is an automated email from the ASF dual-hosted git repository. riteshghorse pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 395c4d15bb7 [Python] Get available python version and use it for Python SDK harness boot entry point (#28046) 395c4d15bb7 is described below commit 395c4d15bb74351b0aa020dc7463de8d85766e07 Author: Ritesh Ghorse <riteshgho...@gmail.com> AuthorDate: Wed Aug 23 13:19:49 2023 -0400 [Python] Get available python version and use it for Python SDK harness boot entry point (#28046) * add env variable config, use GetPythonVersion * fix error message * add spaces --- .../core/runtime/xlangx/expansionx/download.go | 15 ++++++++-- .../runtime/xlangx/expansionx/download_test.go | 32 ++++++++++++++++++++++ sdks/python/container/boot.go | 13 +++++++-- sdks/python/container/piputil.go | 19 +++++++++---- sdks/python/expansion-service-container/boot.go | 9 ++++-- 5 files changed, 76 insertions(+), 12 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go index af3b495720b..e5fff103967 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go @@ -382,20 +382,29 @@ func jarExists(jarPath string) bool { return err == nil } -func getPythonVersion() (string, error) { +// GetPythonVersion returns the Python version to use. It checks for +// env variable PYTHON_PATH and returns that it if set. +// If no PYTHON_PATH is defined then it checks for python or python3 +// and returns that. Otherwise it returns an error. +func GetPythonVersion() (string, error) { + if pythonPath := os.Getenv("PYTHON_PATH"); pythonPath != "" { + return pythonPath, nil + } for _, v := range []string{"python", "python3"} { cmd := exec.Command(v, "--version") if err := cmd.Run(); err == nil { return v, nil } } - return "", fmt.Errorf("no python installation found") + return "", errors.New("no python installation found. If you use a " + + "custom container image, please check if python/python3 is available or specify the " + + "full path to the python interpreter in PYTHON_PATH environment variable") } // SetUpPythonEnvironment sets up the virtual ennvironment required for the // Apache Beam Python SDK to run an expansion service module. func SetUpPythonEnvironment(extraPackage string) (string, error) { - py, err := getPythonVersion() + py, err := GetPythonVersion() if err != nil { return "", fmt.Errorf("no python installation found: %v", err) } diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go index 9779c236189..65e72342a9b 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go @@ -217,3 +217,35 @@ func TestGetJar_dev(t *testing.T) { t.Errorf("error message does not contain gradle command %v for user, got message: %v", gradleTarget, err) } } + +func TestGetPythonVersion(t *testing.T) { + tests := []struct { + name string + PYTHON_PATH string + }{ + { + name: "PYTHON_PATH set", + PYTHON_PATH: "/bin/python", + }, + { + name: "PYTHON_PATH not set", + PYTHON_PATH: "", + }, + } + + for _, test := range tests { + if test.PYTHON_PATH != "" { + os.Setenv("PYTHON_PATH", test.PYTHON_PATH) + } + pythonVersion, err := GetPythonVersion() + if err != nil { + t.Errorf("python installation not found: %v, when PYTHON_PATH=%v", err, test.PYTHON_PATH) + } + if test.PYTHON_PATH != "" && pythonVersion != test.PYTHON_PATH { + t.Errorf("incorrect PYTHON_PATH, want: %v, got: %v", test.PYTHON_PATH, pythonVersion) + } + if test.PYTHON_PATH != "" { + os.Unsetenv(test.PYTHON_PATH) + } + } +} diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index c67239f0564..e7b11daa397 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -36,6 +36,7 @@ import ( "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" @@ -91,7 +92,11 @@ func main() { "--container_executable=/opt/apache/beam/boot", } log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " ")) - if err := execx.Execute("python", args...); err != nil { + pythonVersion, err := expansionx.GetPythonVersion() + if err != nil { + log.Fatalf("Python SDK worker pool exited with error: %v", err) + } + if err := execx.Execute(pythonVersion, args...); err != nil { log.Fatalf("Python SDK worker pool exited with error: %v", err) } log.Print("Python SDK worker pool exited.") @@ -336,7 +341,11 @@ func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId stri if err := os.MkdirAll(dir, 0750); err != nil { return "", fmt.Errorf("failed to create Python venv directory: %s", err) } - if err := execx.Execute("python", "-m", "venv", "--system-site-packages", dir); err != nil { + pythonVersion, err := expansionx.GetPythonVersion() + if err != nil { + return "", err + } + if err := execx.Execute(pythonVersion, "-m", "venv", "--system-site-packages", dir); err != nil { return "", fmt.Errorf("python venv initialization failed: %s", err) } diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go index 720bf372c53..350bda049d9 100644 --- a/sdks/python/container/piputil.go +++ b/sdks/python/container/piputil.go @@ -26,11 +26,16 @@ import ( "path/filepath" "strings" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" ) // pipInstallRequirements installs the given requirement, if present. func pipInstallRequirements(files []string, dir, name string) error { + pythonVersion, err := expansionx.GetPythonVersion() + if err != nil { + return err + } for _, file := range files { if file == name { // We run the install process in two rounds in order to avoid as much @@ -38,14 +43,14 @@ func pipInstallRequirements(files []string, dir, name string) error { // option will make sure that only things staged in the worker will be // used without following their dependencies. args := []string{"-m", "pip", "install", "-q", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir} - if err := execx.Execute("python", args...); err != nil { + if err := execx.Execute(pythonVersion, args...); err != nil { fmt.Println("Some packages could not be installed solely from the requirements cache. Installing packages from PyPI.") } // The second install round opens up the search for packages on PyPI and // also installs dependencies. The key is that if all the packages have // been installed in the first round then this command will be a no-op. args = []string{"-m", "pip", "install", "-q", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--find-links", dir} - return execx.Execute("python", args...) + return execx.Execute(pythonVersion, args...) } } return nil @@ -65,6 +70,10 @@ func isPackageInstalled(pkgName string) bool { // pipInstallPackage installs the given package, if present. func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error { + pythonVersion, err := expansionx.GetPythonVersion() + if err != nil { + return err + } for _, file := range files { if file == name { var packageSpec = name @@ -90,17 +99,17 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e // installed if necessary. This achieves our goal outlined above. args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps", filepath.Join(dir, packageSpec)} - err := execx.Execute("python", args...) + err := execx.Execute(pythonVersion, args...) if err != nil { return err } args = []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)} - return execx.Execute("python", args...) + return execx.Execute(pythonVersion, args...) } // Case when we do not perform a forced reinstall. args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)} - return execx.Execute("python", args...) + return execx.Execute(pythonVersion, args...) } } if optional { diff --git a/sdks/python/expansion-service-container/boot.go b/sdks/python/expansion-service-container/boot.go index 171e8ef62a3..90a97c35425 100644 --- a/sdks/python/expansion-service-container/boot.go +++ b/sdks/python/expansion-service-container/boot.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" ) @@ -58,6 +59,10 @@ func main() { } func launchExpansionServiceProcess() error { + pythonVersion, err := expansionx.GetPythonVersion() + if err != nil { + return err + } log.Printf("Starting Python expansion service ...") dir := filepath.Join("/opt/apache/beam", venvDirectory) @@ -65,8 +70,8 @@ func launchExpansionServiceProcess() error { os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":")) args := []string{"-m", expansionServiceEntrypoint, "-p", strconv.Itoa(*port), "--fully_qualified_name_glob", "*"} - if err := execx.Execute("python", args...); err != nil { - return fmt.Errorf("Could not start the expansion service: %s", err) + if err := execx.Execute(pythonVersion, args...); err != nil { + return fmt.Errorf("could not start the expansion service: %s", err) } return nil