This is an automated email from the ASF dual-hosted git repository.

riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 395c4d15bb7 [Python] Get available python version and use it for 
Python SDK harness boot entry point  (#28046)
395c4d15bb7 is described below

commit 395c4d15bb74351b0aa020dc7463de8d85766e07
Author: Ritesh Ghorse <riteshgho...@gmail.com>
AuthorDate: Wed Aug 23 13:19:49 2023 -0400

    [Python] Get available python version and use it for Python SDK harness 
boot entry point  (#28046)
    
    * add env variable config, use GetPythonVersion
    
    * fix error message
    
    * add spaces
---
 .../core/runtime/xlangx/expansionx/download.go     | 15 ++++++++--
 .../runtime/xlangx/expansionx/download_test.go     | 32 ++++++++++++++++++++++
 sdks/python/container/boot.go                      | 13 +++++++--
 sdks/python/container/piputil.go                   | 19 +++++++++----
 sdks/python/expansion-service-container/boot.go    |  9 ++++--
 5 files changed, 76 insertions(+), 12 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go 
b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
index af3b495720b..e5fff103967 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
@@ -382,20 +382,29 @@ func jarExists(jarPath string) bool {
        return err == nil
 }
 
-func getPythonVersion() (string, error) {
+// GetPythonVersion returns the Python version to use. It checks for
+// env variable PYTHON_PATH and returns that it if set.
+// If no PYTHON_PATH is defined then it checks for python or python3
+// and returns that. Otherwise it returns an error.
+func GetPythonVersion() (string, error) {
+       if pythonPath := os.Getenv("PYTHON_PATH"); pythonPath != "" {
+               return pythonPath, nil
+       }
        for _, v := range []string{"python", "python3"} {
                cmd := exec.Command(v, "--version")
                if err := cmd.Run(); err == nil {
                        return v, nil
                }
        }
-       return "", fmt.Errorf("no python installation found")
+       return "", errors.New("no python installation found. If you use a " +
+               "custom container image, please check if python/python3 is 
available or specify the " +
+               "full path to the python interpreter in PYTHON_PATH environment 
variable")
 }
 
 // SetUpPythonEnvironment sets up the virtual ennvironment required for the
 // Apache Beam Python SDK to run an expansion service module.
 func SetUpPythonEnvironment(extraPackage string) (string, error) {
-       py, err := getPythonVersion()
+       py, err := GetPythonVersion()
        if err != nil {
                return "", fmt.Errorf("no python installation found: %v", err)
        }
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go 
b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go
index 9779c236189..65e72342a9b 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go
@@ -217,3 +217,35 @@ func TestGetJar_dev(t *testing.T) {
                t.Errorf("error message does not contain gradle command %v for 
user, got message: %v", gradleTarget, err)
        }
 }
+
+func TestGetPythonVersion(t *testing.T) {
+       tests := []struct {
+               name        string
+               PYTHON_PATH string
+       }{
+               {
+                       name:        "PYTHON_PATH set",
+                       PYTHON_PATH: "/bin/python",
+               },
+               {
+                       name:        "PYTHON_PATH not set",
+                       PYTHON_PATH: "",
+               },
+       }
+
+       for _, test := range tests {
+               if test.PYTHON_PATH != "" {
+                       os.Setenv("PYTHON_PATH", test.PYTHON_PATH)
+               }
+               pythonVersion, err := GetPythonVersion()
+               if err != nil {
+                       t.Errorf("python installation not found: %v, when 
PYTHON_PATH=%v", err, test.PYTHON_PATH)
+               }
+               if test.PYTHON_PATH != "" && pythonVersion != test.PYTHON_PATH {
+                       t.Errorf("incorrect PYTHON_PATH, want: %v, got: %v", 
test.PYTHON_PATH, pythonVersion)
+               }
+               if test.PYTHON_PATH != "" {
+                       os.Unsetenv(test.PYTHON_PATH)
+               }
+       }
+}
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index c67239f0564..e7b11daa397 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -36,6 +36,7 @@ import (
 
        "github.com/apache/beam/sdks/v2/go/container/tools"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
@@ -91,7 +92,11 @@ func main() {
                        "--container_executable=/opt/apache/beam/boot",
                }
                log.Printf("Starting worker pool %v: python %v", workerPoolId, 
strings.Join(args, " "))
-               if err := execx.Execute("python", args...); err != nil {
+               pythonVersion, err := expansionx.GetPythonVersion()
+               if err != nil {
+                       log.Fatalf("Python SDK worker pool exited with error: 
%v", err)
+               }
+               if err := execx.Execute(pythonVersion, args...); err != nil {
                        log.Fatalf("Python SDK worker pool exited with error: 
%v", err)
                }
                log.Print("Python SDK worker pool exited.")
@@ -336,7 +341,11 @@ func setupVenv(ctx context.Context, logger *tools.Logger, 
baseDir, workerId stri
        if err := os.MkdirAll(dir, 0750); err != nil {
                return "", fmt.Errorf("failed to create Python venv directory: 
%s", err)
        }
-       if err := execx.Execute("python", "-m", "venv", 
"--system-site-packages", dir); err != nil {
+       pythonVersion, err := expansionx.GetPythonVersion()
+       if err != nil {
+               return "", err
+       }
+       if err := execx.Execute(pythonVersion, "-m", "venv", 
"--system-site-packages", dir); err != nil {
                return "", fmt.Errorf("python venv initialization failed: %s", 
err)
        }
 
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index 720bf372c53..350bda049d9 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -26,11 +26,16 @@ import (
        "path/filepath"
        "strings"
 
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
 )
 
 // pipInstallRequirements installs the given requirement, if present.
 func pipInstallRequirements(files []string, dir, name string) error {
+       pythonVersion, err := expansionx.GetPythonVersion()
+       if err != nil {
+               return err
+       }
        for _, file := range files {
                if file == name {
                        // We run the install process in two rounds in order to 
avoid as much
@@ -38,14 +43,14 @@ func pipInstallRequirements(files []string, dir, name 
string) error {
                        // option will make sure that only things staged in the 
worker will be
                        // used without following their dependencies.
                        args := []string{"-m", "pip", "install", "-q", "-r", 
filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", 
"--no-index", "--no-deps", "--find-links", dir}
-                       if err := execx.Execute("python", args...); err != nil {
+                       if err := execx.Execute(pythonVersion, args...); err != 
nil {
                                fmt.Println("Some packages could not be 
installed solely from the requirements cache. Installing packages from PyPI.")
                        }
                        // The second install round opens up the search for 
packages on PyPI and
                        // also installs dependencies. The key is that if all 
the packages have
                        // been installed in the first round then this command 
will be a no-op.
                        args = []string{"-m", "pip", "install", "-q", "-r", 
filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", 
"--find-links", dir}
-                       return execx.Execute("python", args...)
+                       return execx.Execute(pythonVersion, args...)
                }
        }
        return nil
@@ -65,6 +70,10 @@ func isPackageInstalled(pkgName string) bool {
 
 // pipInstallPackage installs the given package, if present.
 func pipInstallPackage(files []string, dir, name string, force, optional bool, 
extras []string) error {
+       pythonVersion, err := expansionx.GetPythonVersion()
+       if err != nil {
+               return err
+       }
        for _, file := range files {
                if file == name {
                        var packageSpec = name
@@ -90,17 +99,17 @@ func pipInstallPackage(files []string, dir, name string, 
force, optional bool, e
                                // installed if necessary.  This achieves our 
goal outlined above.
                                args := []string{"-m", "pip", "install", "-q", 
"--no-cache-dir", "--disable-pip-version-check", "--upgrade", 
"--force-reinstall", "--no-deps",
                                        filepath.Join(dir, packageSpec)}
-                               err := execx.Execute("python", args...)
+                               err := execx.Execute(pythonVersion, args...)
                                if err != nil {
                                        return err
                                }
                                args = []string{"-m", "pip", "install", "-q", 
"--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, 
packageSpec)}
-                               return execx.Execute("python", args...)
+                               return execx.Execute(pythonVersion, args...)
                        }
 
                        // Case when we do not perform a forced reinstall.
                        args := []string{"-m", "pip", "install", "-q", 
"--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, 
packageSpec)}
-                       return execx.Execute("python", args...)
+                       return execx.Execute(pythonVersion, args...)
                }
        }
        if optional {
diff --git a/sdks/python/expansion-service-container/boot.go 
b/sdks/python/expansion-service-container/boot.go
index 171e8ef62a3..90a97c35425 100644
--- a/sdks/python/expansion-service-container/boot.go
+++ b/sdks/python/expansion-service-container/boot.go
@@ -26,6 +26,7 @@ import (
        "strconv"
        "strings"
 
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
 )
 
@@ -58,6 +59,10 @@ func main() {
 }
 
 func launchExpansionServiceProcess() error {
+       pythonVersion, err := expansionx.GetPythonVersion()
+       if err != nil {
+               return err
+       }
        log.Printf("Starting Python expansion service ...")
 
        dir := filepath.Join("/opt/apache/beam", venvDirectory)
@@ -65,8 +70,8 @@ func launchExpansionServiceProcess() error {
        os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), 
os.Getenv("PATH")}, ":"))
 
        args := []string{"-m", expansionServiceEntrypoint, "-p", 
strconv.Itoa(*port), "--fully_qualified_name_glob", "*"}
-       if err := execx.Execute("python", args...); err != nil {
-               return fmt.Errorf("Could not start the expansion service: %s", 
err)
+       if err := execx.Execute(pythonVersion, args...); err != nil {
+               return fmt.Errorf("could not start the expansion service: %s", 
err)
        }
 
        return nil

Reply via email to