This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new eac11fed1d4 Simply wheel validation logic. (#37687)
eac11fed1d4 is described below
commit eac11fed1d45bdfd50c9499b8c4a75f36dfbdfe7
Author: tvalentyn <[email protected]>
AuthorDate: Tue Feb 24 09:32:26 2026 -0800
Simply wheel validation logic. (#37687)
---
sdks/python/container/boot.go | 27 +--------------------------
sdks/python/container/piputil.go | 39 +++++++++++++++++++++++++++++----------
2 files changed, 30 insertions(+), 36 deletions(-)
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 85e5b07a121..7c0f22675da 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -29,7 +29,6 @@ import (
"os/exec"
"os/signal"
"path/filepath"
- "regexp"
"slices"
"strings"
"sync"
@@ -47,8 +46,6 @@ import (
)
var (
- acceptableWhlSpecs []string
-
// SetupOnly option is used to invoke the boot sequence to only process
the provided artifacts and builds new dependency pre-cached images.
setupOnly = flag.Bool("setup_only", false, "Execute boot program in
setup only mode (optional).")
artifacts = flag.String("artifacts", "", "Path to artifacts metadata
file used in setup only mode (optional).")
@@ -406,37 +403,15 @@ func setupVenv(ctx context.Context, logger *tools.Logger,
baseDir, workerId stri
return dir, nil
}
-// setupAcceptableWheelSpecs setup wheel specs according to installed python
version
-func setupAcceptableWheelSpecs() error {
- cmd := exec.Command("python", "-V")
- stdoutStderr, err := cmd.CombinedOutput()
- if err != nil {
- return err
- }
- re := regexp.MustCompile(`Python (\d)\.(\d+).*`)
- pyVersions := re.FindStringSubmatch(string(stdoutStderr[:]))
- if len(pyVersions) != 3 {
- return fmt.Errorf("cannot get parse Python version from %s",
stdoutStderr)
- }
- pyVersion := fmt.Sprintf("%s%s", pyVersions[1], pyVersions[2])
- wheelName :=
fmt.Sprintf("cp%s-cp%s-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
pyVersion, pyVersion)
- acceptableWhlSpecs = append(acceptableWhlSpecs, wheelName)
- return nil
-}
-
// installSetupPackages installs Beam SDK and user dependencies.
func installSetupPackages(ctx context.Context, logger *tools.Logger, files
[]string, workDir string, requirementsFiles []string) error {
bufLogger := tools.NewBufferedLogger(logger)
bufLogger.Printf(ctx, "Installing setup packages ...")
- if err := setupAcceptableWheelSpecs(); err != nil {
- bufLogger.Printf(ctx, "Failed to setup acceptable wheel specs,
leave it as empty: %v", err)
- }
-
// Install the Dataflow Python SDK if one was staged. In released
// container images, SDK is already installed, but can be overriden
// using the --sdk_location pipeline option.
- if err := installSdk(ctx, logger, files, workDir, sdkSrcFile,
acceptableWhlSpecs, false); err != nil {
+ if err := installSdk(ctx, logger, files, workDir, sdkSrcFile, false);
err != nil {
return fmt.Errorf("failed to install SDK: %v", err)
}
pkgName := "apache-beam"
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index 1faf8421a02..2024c16dde5 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -24,6 +24,7 @@ import (
"os"
"os/exec"
"path/filepath"
+ "regexp"
"strings"
"time"
@@ -191,16 +192,34 @@ func installExtraPackages(ctx context.Context, logger
*tools.Logger, files []str
return nil
}
-func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string,
acceptableWhlSpecs []string) string {
+// getPythonVersionSpec returns the Python version specifier (e.g., "310" for
Python 3.10)
+func getPythonVersionSpec() (string, error) {
+ cmd := exec.Command("python", "-V")
+ stdoutStderr, err := cmd.CombinedOutput()
+ if err != nil {
+ return "", err
+ }
+ re := regexp.MustCompile(`Python (\d)\.(\d+).*`)
+ pyVersions := re.FindStringSubmatch(string(stdoutStderr[:]))
+ if len(pyVersions) != 3 {
+ return "", fmt.Errorf("cannot parse Python version from %s",
stdoutStderr)
+ }
+ return fmt.Sprintf("%s%s", pyVersions[1], pyVersions[2]), nil
+}
+
+func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string)
string {
bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger,
pipLogFlushInterval)
+
+ pyVersionSpec, err := getPythonVersionSpec()
+ if err != nil {
+ bufLogger.Printf(ctx, "Failed to get Python version specifier:
%v", err)
+ return ""
+ }
+
for _, file := range files {
- if strings.HasPrefix(file, "apache_beam") {
- for _, s := range acceptableWhlSpecs {
- if strings.HasSuffix(file, s) {
- bufLogger.Printf(ctx, "Found Apache
Beam SDK wheel: %v", file)
- return file
- }
- }
+ if strings.HasPrefix(file, "apache_beam") &&
strings.HasSuffix(file, ".whl") && strings.Contains(file, "cp"+pyVersionSpec) {
+ bufLogger.Printf(ctx, "Found Apache Beam SDK wheel:
%v", file)
+ return file
}
}
return ""
@@ -211,8 +230,8 @@ func findBeamSdkWhl(ctx context.Context, logger
*tools.Logger, files []string, a
// assume that the pipleine was started with the Beam SDK found in the wheel
// file, and we try to install it. If not successful, we fall back to
installing
// SDK from source tarball provided in sdkSrcFile.
-func installSdk(ctx context.Context, logger *tools.Logger, files []string,
workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool)
error {
- sdkWhlFile := findBeamSdkWhl(ctx, logger, files, acceptableWhlSpecs)
+func installSdk(ctx context.Context, logger *tools.Logger, files []string,
workDir string, sdkSrcFile string, required bool) error {
+ sdkWhlFile := findBeamSdkWhl(ctx, logger, files)
bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger,
pipLogFlushInterval)
if sdkWhlFile != "" {
// by default, pip rejects to install wheel if same version
already installed