This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit 21b9a971186ebe5f658dfc986dfd34b73f50ff13
Author: youjin <[email protected]>
AuthorDate: Thu Dec 25 18:59:53 2025 +0800

    [build] Support Multi-Version Flink in Python
---
 python/flink_agents/api/execution_environment.py   |  30 +++-
 .../api/tests/test_version_compatibility.py        | 169 ++++++++++++++++++++
 python/flink_agents/api/version_compatibility.py   | 170 +++++++++++++++++++++
 python/pyproject.toml                              |   3 +-
 tools/build.sh                                     |  24 ++-
 tools/e2e.sh                                       |   6 +
 6 files changed, 393 insertions(+), 9 deletions(-)

diff --git a/python/flink_agents/api/execution_environment.py 
b/python/flink_agents/api/execution_environment.py
index a198e6c..e632de3 100644
--- a/python/flink_agents/api/execution_environment.py
+++ b/python/flink_agents/api/execution_environment.py
@@ -31,6 +31,7 @@ from flink_agents.api.resource import (
     ResourceType,
     SerializableResource,
 )
+from flink_agents.api.version_compatibility import flink_version_manager
 
 
 class AgentBuilder(ABC):
@@ -127,11 +128,30 @@ class AgentsExecutionEnvironment(ABC):
                 "flink_agents.runtime.local_execution_environment"
             ).create_instance(env=env, t_env=t_env, **kwargs)
         else:
-            for path in files("flink_agents.lib").iterdir():
-                env.add_jars(f"file://{path}")
-            return importlib.import_module(
-                "flink_agents.runtime.remote_execution_environment"
-            ).create_instance(env=env, t_env=t_env, **kwargs)
+            major_version = flink_version_manager.major_version
+            if major_version:
+                # Determine the version-specific lib directory
+                version_dir = f"flink-{major_version}"
+                lib_base = files("flink_agents.lib")
+                version_lib = lib_base / version_dir
+
+                # Check if version-specific directory exists
+                if version_lib.is_dir():
+                    for jar_file in version_lib.iterdir():
+                        if jar_file.is_file() and 
str(jar_file).endswith(".jar"):
+                            env.add_jars(f"file://{jar_file}")
+                else:
+                    err_msg = (
+                        f"Flink Agents dist JAR for Flink {major_version} not 
found."
+                    )
+                    raise FileNotFoundError(err_msg)
+
+                return importlib.import_module(
+                    "flink_agents.runtime.remote_execution_environment"
+                ).create_instance(env=env, t_env=t_env, **kwargs)
+            else:
+                err_msg = "Apache Flink is not installed."
+                raise ModuleNotFoundError(err_msg)
 
     @abstractmethod
     def get_config(self, path: str | None = None) -> Configuration:
diff --git a/python/flink_agents/api/tests/test_version_compatibility.py 
b/python/flink_agents/api/tests/test_version_compatibility.py
new file mode 100644
index 0000000..9583970
--- /dev/null
+++ b/python/flink_agents/api/tests/test_version_compatibility.py
@@ -0,0 +1,169 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from unittest.mock import patch
+
+from flink_agents.api.version_compatibility import (
+    FlinkVersionManager,
+    _normalize_version,
+)
+
+
+# Tests for _normalize_version function
+def test_normalize_three_part_version() -> None:  # noqa: D103
+    assert _normalize_version("1.20.3") == "1.20.3"
+    assert _normalize_version("2.2.0") == "2.2.0"
+
+
+def test_normalize_two_part_version() -> None:  # noqa: D103
+    assert _normalize_version("2.2") == "2.2.0"
+    assert _normalize_version("1.20") == "1.20.0"
+
+
+def test_normalize_version_with_suffix() -> None:  # noqa: D103
+    assert _normalize_version("2.2-SNAPSHOT") == "2.2.0"
+    assert _normalize_version("1.20.dev0") == "1.20.0"
+    assert _normalize_version("2.0.rc1") == "2.0.0"
+
+
+def test_normalize_long_version() -> None:  # noqa: D103
+    assert _normalize_version("1.20.3.4") == "1.20.3"
+    assert _normalize_version("2.2.0.1.5") == "2.2.0"
+
+
+# Tests for FlinkVersionManager class
+def test_version_property_with_flink_installed() -> None:  # noqa: D103
+    with patch("importlib.metadata.version", return_value="1.20.3"):
+        manager = FlinkVersionManager()
+        assert manager.version == "1.20.3"
+
+
+def test_version_property_without_flink_installed() -> None:  # noqa: D103
+    with patch(
+        "importlib.metadata.version", side_effect=Exception("Package not 
found")
+    ):
+        manager = FlinkVersionManager()
+        assert manager.version is None
+
+
+def test_major_version_property() -> None:  # noqa: D103
+    with patch("importlib.metadata.version", return_value="1.20.3"):
+        manager = FlinkVersionManager()
+        assert manager.major_version == "1.20"
+
+    with patch("importlib.metadata.version", return_value="2.2.0"):
+        manager = FlinkVersionManager()
+        assert manager.major_version == "2.2"
+
+
+def test_major_version_with_snapshot() -> None:  # noqa: D103
+    with patch("importlib.metadata.version", return_value="2.2.0-SNAPSHOT"):
+        manager = FlinkVersionManager()
+        assert manager.major_version == "2.2"
+
+
+def test_major_version_without_flink() -> None:  # noqa: D103
+    with patch(
+        "importlib.metadata.version", side_effect=Exception("Package not 
found")
+    ):
+        manager = FlinkVersionManager()
+        assert manager.major_version is None
+
+
+def test_ge_method() -> None:  # noqa: D103
+    with patch("importlib.metadata.version", return_value="1.20.3"):
+        manager = FlinkVersionManager()
+        assert manager.ge("1.20.0") is True
+        assert manager.ge("1.20.3") is True
+        assert manager.ge("1.21.0") is False
+
+
+def test_ge_with_two_part_version() -> None:  # noqa: D103
+    with patch("importlib.metadata.version", return_value="2.2"):
+        manager = FlinkVersionManager()
+        assert manager.ge("2.0.0") is True
+        assert manager.ge("2.2") is True
+        assert manager.ge("2.3") is False
+
+
+def test_ge_without_flink_installed() -> None:  # noqa: D103
+    with patch(
+        "importlib.metadata.version", side_effect=Exception("Package not 
found")
+    ):
+        manager = FlinkVersionManager()
+        assert manager.ge("1.20.0") is False
+
+
+def test_lt_method() -> None:  # noqa: D103
+    with patch("importlib.metadata.version", return_value="1.20.3"):
+        manager = FlinkVersionManager()
+        assert manager.lt("1.21.0") is True
+        assert manager.lt("1.20.3") is False
+        assert manager.lt("1.20.0") is False
+
+
+def test_lt_with_two_part_version() -> None:  # noqa: D103
+    with patch("importlib.metadata.version", return_value="2.2"):
+        manager = FlinkVersionManager()
+        assert manager.lt("2.3") is True
+        assert manager.lt("2.2") is False
+        assert manager.lt("2.0") is False
+
+
+def test_lt_without_flink_installed() -> None:  # noqa: D103
+    with patch(
+        "importlib.metadata.version", side_effect=Exception("Package not 
found")
+    ):
+        manager = FlinkVersionManager()
+        assert manager.lt("2.0.0") is False
+
+
+def test_lazy_initialization() -> None:  # noqa: D103
+    with patch("importlib.metadata.version", return_value="1.20.3") as 
mock_version:
+        manager = FlinkVersionManager()
+        # Version should not be fetched yet
+        assert not manager._initialized
+        mock_version.assert_not_called()
+
+        # First access triggers initialization
+        _ = manager.version
+        assert manager._initialized
+        mock_version.assert_called_once()
+
+        # Second access should use cached value
+        _ = manager.version
+        mock_version.assert_called_once()  # Still called only once
+
+
+def test_version_comparison_with_snapshot_versions() -> None:  # noqa: D103
+    with patch("importlib.metadata.version", return_value="2.2-SNAPSHOT"):
+        manager = FlinkVersionManager()
+        assert manager.ge("2.2.0") is True
+        assert manager.ge("2.1.0") is True
+        assert manager.lt("2.3.0") is True
+
+
+def test_version_comparison_edge_cases() -> None:  # noqa: D103
+    # Test boundary versions
+    with patch("importlib.metadata.version", return_value="1.20.3"):
+        manager = FlinkVersionManager()
+        assert manager.ge("1.20.2") is True
+        assert manager.ge("1.20.3") is True
+        assert manager.ge("1.20.4") is False
+        assert manager.lt("1.20.4") is True
+        assert manager.lt("1.20.3") is False
+        assert manager.ge("2.0.0") is False
diff --git a/python/flink_agents/api/version_compatibility.py 
b/python/flink_agents/api/version_compatibility.py
new file mode 100644
index 0000000..4060c40
--- /dev/null
+++ b/python/flink_agents/api/version_compatibility.py
@@ -0,0 +1,170 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+from packaging import version
+
+
+def _normalize_version(version_str: str) -> str:
+    """Normalize version string to standard format.
+
+    Handles various version formats and normalizes them to a three-part version
+    string (major.minor.patch). If only two parts are provided, appends '.0' as
+    the patch version.
+
+    Args:
+        version_str: The version string to normalize (e.g., "2.2", "1.20.3",
+                    "2.2.0-SNAPSHOT", "1.20.dev0", "2.0.rc1")
+
+    Returns:
+        str: Normalized version string in format "major.minor.patch"
+    """
+    # Remove any version suffix with hyphen (e.g., -SNAPSHOT, -dev)
+    base_version = version_str.split('-')[0]
+
+    # Split by dot and keep only numeric parts
+    parts = []
+    for part in base_version.split('.'):
+        # Only keep parts that are purely numeric
+        if part.isdigit():
+            parts.append(part)
+        # Stop if we encounter a non-numeric part (e.g., 'dev0', 'rc1')
+        else:
+            break
+
+    # Ensure we have at least three parts (major.minor.patch)
+    while len(parts) < 3:
+        parts.append('0')
+
+    return '.'.join(parts[:3])
+
+
+class FlinkVersionManager:
+    """Manager for Apache Flink version compatibility checks.
+
+    This class provides lazy initialization and caching of the installed Flink
+    version, along with utility methods for version comparison. It uses a 
singleton
+    pattern through the global flink_version_manager instance.
+
+    The version information is fetched only once when first accessed, improving
+    startup performance and avoiding repeated package queries.
+
+    Attributes:
+        _flink_version: Cached version string of the installed apache-flink 
package
+        _initialized: Flag indicating whether version has been fetched
+    """
+
+    def __init__(self) -> None:
+        """Initialize the FlinkVersionManager with uninitialized state."""
+        self._flink_version = None
+        self._initialized = False
+
+    def _initialize(self) -> None:
+        """Perform lazy initialization of the Flink version.
+
+        This method is called automatically when version information is first
+        accessed. It fetches the version once and caches it for subsequent 
calls.
+        """
+        if self._initialized:
+            return
+
+        # Attempt to retrieve the version from installed packages
+        self._flink_version = self._get_pyflink_version()
+        self._initialized = True
+
+    def _get_pyflink_version(self) -> str | None:
+        """Retrieve the version of the installed apache-flink package.
+
+        Uses importlib.metadata to query the package version. This method 
handles
+        cases where the package is not installed or cannot be queried.
+
+        Returns:
+            Optional[str]: The version string if apache-flink is installed,
+                          None otherwise
+        """
+        try:
+            from importlib.metadata import version as get_version
+            return get_version("apache-flink")
+        except Exception:
+            return None
+
+    @property
+    def version(self) -> str | None:
+        """Get the full version string of the installed Flink.
+
+        Returns:
+            Optional[str]: Full version string (e.g., "1.20.3", "2.2.0") or 
None
+                          if apache-flink is not installed
+        """
+        self._initialize()
+        return self._flink_version
+
+    @property
+    def major_version(self) -> str | None:
+        """Get the major version number (major.minor) of the installed Flink.
+
+        Extracts the first two version components from the full version string,
+        which is useful for feature compatibility checks between major 
releases.
+
+        Returns:
+            Optional[str]: Major version string (e.g., "2.2", "1.20") or None
+                          if apache-flink is not installed
+        """
+        if not self.version:
+            return None
+
+        # Extract major.minor from full version string
+        # Examples: "2.2.0" -> "2.2", "1.20.3" -> "1.20", "2.2.0-SNAPSHOT" -> 
"2.2"
+        version_parts = self.version.split('-')[0].split('.')
+        if len(version_parts) >= 2:
+            return f"{version_parts[0]}.{version_parts[1]}"
+        return self.version
+
+    def ge(self, target_version: str) -> bool:
+        """Check if the installed Flink version is greater than or equal to 
the target.
+
+        Args:
+            target_version: The minimum version to compare against (e.g., 
"1.20.0")
+
+        Returns:
+            bool: True if installed version >= target version, False otherwise
+                  (including when Flink is not installed)
+        """
+        if not self.version:
+            return False
+
+        current = _normalize_version(self.version)
+        target = _normalize_version(target_version)
+        return version.parse(current) >= version.parse(target)
+
+    def lt(self, target_version: str) -> bool:
+        """Check if the installed Flink version is less than the target.
+
+        Args:
+            target_version: The version threshold to compare against (e.g., 
"2.0.0")
+
+        Returns:
+            bool: True if installed version < target version, False otherwise
+                  (including when Flink is not installed)
+        """
+        if not self.version:
+            return False
+        return not self.ge(target_version)
+
+
+# Global singleton instance for Flink version management
+flink_version_manager = FlinkVersionManager()
diff --git a/python/pyproject.toml b/python/pyproject.toml
index c9c12af..a4a17a9 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -42,7 +42,6 @@ classifiers = [
 ]
 
 dependencies = [
-    "apache-flink==2.2.0",
     "pydantic==2.11.4",
     "docstring-parser==0.16",
     "pyyaml==6.0.2",
@@ -64,7 +63,7 @@ include-package-data = true
 where = [""]
 
 [tool.setuptools.package-data]
-"flink_agents.lib" = ["*.jar"]
+"flink_agents.lib" = ["**/*.jar"]
 
 # Optional dependencies (dependency groups)
 [project.optional-dependencies]
diff --git a/tools/build.sh b/tools/build.sh
index 435ce56..63f3ecd 100755
--- a/tools/build.sh
+++ b/tools/build.sh
@@ -46,13 +46,33 @@ if $build_java; then
 fi
 
 if $build_python; then
-  # copy flink-agents-dist jar to python lib
+  # copy flink-agents-dist jars to python lib with version subdirectories
   PYTHON_LIB_DIR=${PROJECT_ROOT}/python/flink_agents/lib
   rm -rf ${PYTHON_LIB_DIR}
   mkdir -p ${PYTHON_LIB_DIR}
 
   PROJECT_VERSION=$(sed -n 's/.*<version>\(.*\)<\/version>.*/\1/p' pom.xml | 
head -n 2 | tail -n 1)
-  cp "${PROJECT_ROOT}/dist/target/flink-agents-dist-${PROJECT_VERSION}.jar" 
${PYTHON_LIB_DIR}
+
+  # Automatically detect and copy all Flink version JARs from dist 
subdirectories
+  DIST_DIR="${PROJECT_ROOT}/dist"
+  for version_dir in "${DIST_DIR}"/flink-*; do
+    if [ -d "$version_dir" ]; then
+      version_name=$(basename "$version_dir")
+      echo "Processing $version_name..."
+
+      # Create corresponding lib subdirectory
+      mkdir -p "${PYTHON_LIB_DIR}/${version_name}"
+
+      # Find and copy the JAR file
+      
jar_file="${version_dir}/target/flink-agents-dist-${version_name}-${PROJECT_VERSION}.jar"
+      if [ -f "$jar_file" ]; then
+        cp "$jar_file" "${PYTHON_LIB_DIR}/${version_name}/"
+        echo "  Copied: 
flink-agents-dist-${version_name}-${PROJECT_VERSION}.jar"
+      else
+        echo "  Warning: JAR file not found at $jar_file"
+      fi
+    fi
+  done
 
   # build python
   cd python
diff --git a/tools/e2e.sh b/tools/e2e.sh
index d48f18e..8e29aa2 100755
--- a/tools/e2e.sh
+++ b/tools/e2e.sh
@@ -14,6 +14,9 @@
 #   See the License for the specific language governing permissions and
 #  limitations under the License.
 #
+
+DEFAULT_FLINK_VERSION="2.2"
+
 function run_test {
   local description="$1"
   local command="$2"
@@ -140,6 +143,9 @@ if [[ ! -d "$python_dir" ]]; then
   exit 1
 fi
 
+cd "$python_dir"
+uv pip install apache-flink~=${DEFAULT_FLINK_VERSION}.0
+
 run_test "Resource Cross-Language end-to-end test in Java" 
"run_resource_cross_language_test_in_java"
 run_test "Resource Cross-Language end-to-end test in Python" 
"run_resource_cross_language_test_in_python"
 run_test "Agent plan compatibility end-to-end test" 
"run_agent_plan_compatibility_test"

Reply via email to