This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit 21b9a971186ebe5f658dfc986dfd34b73f50ff13 Author: youjin <[email protected]> AuthorDate: Thu Dec 25 18:59:53 2025 +0800 [build] Support Multi-Version Flink in Python --- python/flink_agents/api/execution_environment.py | 30 +++- .../api/tests/test_version_compatibility.py | 169 ++++++++++++++++++++ python/flink_agents/api/version_compatibility.py | 170 +++++++++++++++++++++ python/pyproject.toml | 3 +- tools/build.sh | 24 ++- tools/e2e.sh | 6 + 6 files changed, 393 insertions(+), 9 deletions(-) diff --git a/python/flink_agents/api/execution_environment.py b/python/flink_agents/api/execution_environment.py index a198e6c..e632de3 100644 --- a/python/flink_agents/api/execution_environment.py +++ b/python/flink_agents/api/execution_environment.py @@ -31,6 +31,7 @@ from flink_agents.api.resource import ( ResourceType, SerializableResource, ) +from flink_agents.api.version_compatibility import flink_version_manager class AgentBuilder(ABC): @@ -127,11 +128,30 @@ class AgentsExecutionEnvironment(ABC): "flink_agents.runtime.local_execution_environment" ).create_instance(env=env, t_env=t_env, **kwargs) else: - for path in files("flink_agents.lib").iterdir(): - env.add_jars(f"file://{path}") - return importlib.import_module( - "flink_agents.runtime.remote_execution_environment" - ).create_instance(env=env, t_env=t_env, **kwargs) + major_version = flink_version_manager.major_version + if major_version: + # Determine the version-specific lib directory + version_dir = f"flink-{major_version}" + lib_base = files("flink_agents.lib") + version_lib = lib_base / version_dir + + # Check if version-specific directory exists + if version_lib.is_dir(): + for jar_file in version_lib.iterdir(): + if jar_file.is_file() and str(jar_file).endswith(".jar"): + env.add_jars(f"file://{jar_file}") + else: + err_msg = ( + f"Flink Agents dist JAR for Flink {major_version} not found." + ) + raise FileNotFoundError(err_msg) + + return importlib.import_module( + "flink_agents.runtime.remote_execution_environment" + ).create_instance(env=env, t_env=t_env, **kwargs) + else: + err_msg = "Apache Flink is not installed." + raise ModuleNotFoundError(err_msg) @abstractmethod def get_config(self, path: str | None = None) -> Configuration: diff --git a/python/flink_agents/api/tests/test_version_compatibility.py b/python/flink_agents/api/tests/test_version_compatibility.py new file mode 100644 index 0000000..9583970 --- /dev/null +++ b/python/flink_agents/api/tests/test_version_compatibility.py @@ -0,0 +1,169 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from unittest.mock import patch + +from flink_agents.api.version_compatibility import ( + FlinkVersionManager, + _normalize_version, +) + + +# Tests for _normalize_version function +def test_normalize_three_part_version() -> None: # noqa: D103 + assert _normalize_version("1.20.3") == "1.20.3" + assert _normalize_version("2.2.0") == "2.2.0" + + +def test_normalize_two_part_version() -> None: # noqa: D103 + assert _normalize_version("2.2") == "2.2.0" + assert _normalize_version("1.20") == "1.20.0" + + +def test_normalize_version_with_suffix() -> None: # noqa: D103 + assert _normalize_version("2.2-SNAPSHOT") == "2.2.0" + assert _normalize_version("1.20.dev0") == "1.20.0" + assert _normalize_version("2.0.rc1") == "2.0.0" + + +def test_normalize_long_version() -> None: # noqa: D103 + assert _normalize_version("1.20.3.4") == "1.20.3" + assert _normalize_version("2.2.0.1.5") == "2.2.0" + + +# Tests for FlinkVersionManager class +def test_version_property_with_flink_installed() -> None: # noqa: D103 + with patch("importlib.metadata.version", return_value="1.20.3"): + manager = FlinkVersionManager() + assert manager.version == "1.20.3" + + +def test_version_property_without_flink_installed() -> None: # noqa: D103 + with patch( + "importlib.metadata.version", side_effect=Exception("Package not found") + ): + manager = FlinkVersionManager() + assert manager.version is None + + +def test_major_version_property() -> None: # noqa: D103 + with patch("importlib.metadata.version", return_value="1.20.3"): + manager = FlinkVersionManager() + assert manager.major_version == "1.20" + + with patch("importlib.metadata.version", return_value="2.2.0"): + manager = FlinkVersionManager() + assert manager.major_version == "2.2" + + +def test_major_version_with_snapshot() -> None: # noqa: D103 + with patch("importlib.metadata.version", return_value="2.2.0-SNAPSHOT"): + manager = FlinkVersionManager() + assert manager.major_version == "2.2" + + +def test_major_version_without_flink() -> None: # noqa: D103 + with patch( + "importlib.metadata.version", side_effect=Exception("Package not found") + ): + manager = FlinkVersionManager() + assert manager.major_version is None + + +def test_ge_method() -> None: # noqa: D103 + with patch("importlib.metadata.version", return_value="1.20.3"): + manager = FlinkVersionManager() + assert manager.ge("1.20.0") is True + assert manager.ge("1.20.3") is True + assert manager.ge("1.21.0") is False + + +def test_ge_with_two_part_version() -> None: # noqa: D103 + with patch("importlib.metadata.version", return_value="2.2"): + manager = FlinkVersionManager() + assert manager.ge("2.0.0") is True + assert manager.ge("2.2") is True + assert manager.ge("2.3") is False + + +def test_ge_without_flink_installed() -> None: # noqa: D103 + with patch( + "importlib.metadata.version", side_effect=Exception("Package not found") + ): + manager = FlinkVersionManager() + assert manager.ge("1.20.0") is False + + +def test_lt_method() -> None: # noqa: D103 + with patch("importlib.metadata.version", return_value="1.20.3"): + manager = FlinkVersionManager() + assert manager.lt("1.21.0") is True + assert manager.lt("1.20.3") is False + assert manager.lt("1.20.0") is False + + +def test_lt_with_two_part_version() -> None: # noqa: D103 + with patch("importlib.metadata.version", return_value="2.2"): + manager = FlinkVersionManager() + assert manager.lt("2.3") is True + assert manager.lt("2.2") is False + assert manager.lt("2.0") is False + + +def test_lt_without_flink_installed() -> None: # noqa: D103 + with patch( + "importlib.metadata.version", side_effect=Exception("Package not found") + ): + manager = FlinkVersionManager() + assert manager.lt("2.0.0") is False + + +def test_lazy_initialization() -> None: # noqa: D103 + with patch("importlib.metadata.version", return_value="1.20.3") as mock_version: + manager = FlinkVersionManager() + # Version should not be fetched yet + assert not manager._initialized + mock_version.assert_not_called() + + # First access triggers initialization + _ = manager.version + assert manager._initialized + mock_version.assert_called_once() + + # Second access should use cached value + _ = manager.version + mock_version.assert_called_once() # Still called only once + + +def test_version_comparison_with_snapshot_versions() -> None: # noqa: D103 + with patch("importlib.metadata.version", return_value="2.2-SNAPSHOT"): + manager = FlinkVersionManager() + assert manager.ge("2.2.0") is True + assert manager.ge("2.1.0") is True + assert manager.lt("2.3.0") is True + + +def test_version_comparison_edge_cases() -> None: # noqa: D103 + # Test boundary versions + with patch("importlib.metadata.version", return_value="1.20.3"): + manager = FlinkVersionManager() + assert manager.ge("1.20.2") is True + assert manager.ge("1.20.3") is True + assert manager.ge("1.20.4") is False + assert manager.lt("1.20.4") is True + assert manager.lt("1.20.3") is False + assert manager.ge("2.0.0") is False diff --git a/python/flink_agents/api/version_compatibility.py b/python/flink_agents/api/version_compatibility.py new file mode 100644 index 0000000..4060c40 --- /dev/null +++ b/python/flink_agents/api/version_compatibility.py @@ -0,0 +1,170 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# + +from packaging import version + + +def _normalize_version(version_str: str) -> str: + """Normalize version string to standard format. + + Handles various version formats and normalizes them to a three-part version + string (major.minor.patch). If only two parts are provided, appends '.0' as + the patch version. + + Args: + version_str: The version string to normalize (e.g., "2.2", "1.20.3", + "2.2.0-SNAPSHOT", "1.20.dev0", "2.0.rc1") + + Returns: + str: Normalized version string in format "major.minor.patch" + """ + # Remove any version suffix with hyphen (e.g., -SNAPSHOT, -dev) + base_version = version_str.split('-')[0] + + # Split by dot and keep only numeric parts + parts = [] + for part in base_version.split('.'): + # Only keep parts that are purely numeric + if part.isdigit(): + parts.append(part) + # Stop if we encounter a non-numeric part (e.g., 'dev0', 'rc1') + else: + break + + # Ensure we have at least three parts (major.minor.patch) + while len(parts) < 3: + parts.append('0') + + return '.'.join(parts[:3]) + + +class FlinkVersionManager: + """Manager for Apache Flink version compatibility checks. + + This class provides lazy initialization and caching of the installed Flink + version, along with utility methods for version comparison. It uses a singleton + pattern through the global flink_version_manager instance. + + The version information is fetched only once when first accessed, improving + startup performance and avoiding repeated package queries. + + Attributes: + _flink_version: Cached version string of the installed apache-flink package + _initialized: Flag indicating whether version has been fetched + """ + + def __init__(self) -> None: + """Initialize the FlinkVersionManager with uninitialized state.""" + self._flink_version = None + self._initialized = False + + def _initialize(self) -> None: + """Perform lazy initialization of the Flink version. + + This method is called automatically when version information is first + accessed. It fetches the version once and caches it for subsequent calls. + """ + if self._initialized: + return + + # Attempt to retrieve the version from installed packages + self._flink_version = self._get_pyflink_version() + self._initialized = True + + def _get_pyflink_version(self) -> str | None: + """Retrieve the version of the installed apache-flink package. + + Uses importlib.metadata to query the package version. This method handles + cases where the package is not installed or cannot be queried. + + Returns: + Optional[str]: The version string if apache-flink is installed, + None otherwise + """ + try: + from importlib.metadata import version as get_version + return get_version("apache-flink") + except Exception: + return None + + @property + def version(self) -> str | None: + """Get the full version string of the installed Flink. + + Returns: + Optional[str]: Full version string (e.g., "1.20.3", "2.2.0") or None + if apache-flink is not installed + """ + self._initialize() + return self._flink_version + + @property + def major_version(self) -> str | None: + """Get the major version number (major.minor) of the installed Flink. + + Extracts the first two version components from the full version string, + which is useful for feature compatibility checks between major releases. + + Returns: + Optional[str]: Major version string (e.g., "2.2", "1.20") or None + if apache-flink is not installed + """ + if not self.version: + return None + + # Extract major.minor from full version string + # Examples: "2.2.0" -> "2.2", "1.20.3" -> "1.20", "2.2.0-SNAPSHOT" -> "2.2" + version_parts = self.version.split('-')[0].split('.') + if len(version_parts) >= 2: + return f"{version_parts[0]}.{version_parts[1]}" + return self.version + + def ge(self, target_version: str) -> bool: + """Check if the installed Flink version is greater than or equal to the target. + + Args: + target_version: The minimum version to compare against (e.g., "1.20.0") + + Returns: + bool: True if installed version >= target version, False otherwise + (including when Flink is not installed) + """ + if not self.version: + return False + + current = _normalize_version(self.version) + target = _normalize_version(target_version) + return version.parse(current) >= version.parse(target) + + def lt(self, target_version: str) -> bool: + """Check if the installed Flink version is less than the target. + + Args: + target_version: The version threshold to compare against (e.g., "2.0.0") + + Returns: + bool: True if installed version < target version, False otherwise + (including when Flink is not installed) + """ + if not self.version: + return False + return not self.ge(target_version) + + +# Global singleton instance for Flink version management +flink_version_manager = FlinkVersionManager() diff --git a/python/pyproject.toml b/python/pyproject.toml index c9c12af..a4a17a9 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -42,7 +42,6 @@ classifiers = [ ] dependencies = [ - "apache-flink==2.2.0", "pydantic==2.11.4", "docstring-parser==0.16", "pyyaml==6.0.2", @@ -64,7 +63,7 @@ include-package-data = true where = [""] [tool.setuptools.package-data] -"flink_agents.lib" = ["*.jar"] +"flink_agents.lib" = ["**/*.jar"] # Optional dependencies (dependency groups) [project.optional-dependencies] diff --git a/tools/build.sh b/tools/build.sh index 435ce56..63f3ecd 100755 --- a/tools/build.sh +++ b/tools/build.sh @@ -46,13 +46,33 @@ if $build_java; then fi if $build_python; then - # copy flink-agents-dist jar to python lib + # copy flink-agents-dist jars to python lib with version subdirectories PYTHON_LIB_DIR=${PROJECT_ROOT}/python/flink_agents/lib rm -rf ${PYTHON_LIB_DIR} mkdir -p ${PYTHON_LIB_DIR} PROJECT_VERSION=$(sed -n 's/.*<version>\(.*\)<\/version>.*/\1/p' pom.xml | head -n 2 | tail -n 1) - cp "${PROJECT_ROOT}/dist/target/flink-agents-dist-${PROJECT_VERSION}.jar" ${PYTHON_LIB_DIR} + + # Automatically detect and copy all Flink version JARs from dist subdirectories + DIST_DIR="${PROJECT_ROOT}/dist" + for version_dir in "${DIST_DIR}"/flink-*; do + if [ -d "$version_dir" ]; then + version_name=$(basename "$version_dir") + echo "Processing $version_name..." + + # Create corresponding lib subdirectory + mkdir -p "${PYTHON_LIB_DIR}/${version_name}" + + # Find and copy the JAR file + jar_file="${version_dir}/target/flink-agents-dist-${version_name}-${PROJECT_VERSION}.jar" + if [ -f "$jar_file" ]; then + cp "$jar_file" "${PYTHON_LIB_DIR}/${version_name}/" + echo " Copied: flink-agents-dist-${version_name}-${PROJECT_VERSION}.jar" + else + echo " Warning: JAR file not found at $jar_file" + fi + fi + done # build python cd python diff --git a/tools/e2e.sh b/tools/e2e.sh index d48f18e..8e29aa2 100755 --- a/tools/e2e.sh +++ b/tools/e2e.sh @@ -14,6 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +DEFAULT_FLINK_VERSION="2.2" + function run_test { local description="$1" local command="$2" @@ -140,6 +143,9 @@ if [[ ! -d "$python_dir" ]]; then exit 1 fi +cd "$python_dir" +uv pip install apache-flink~=${DEFAULT_FLINK_VERSION}.0 + run_test "Resource Cross-Language end-to-end test in Java" "run_resource_cross_language_test_in_java" run_test "Resource Cross-Language end-to-end test in Python" "run_resource_cross_language_test_in_python" run_test "Agent plan compatibility end-to-end test" "run_agent_plan_compatibility_test"
