This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 233d3cbd625 Guard JavaCoordinator JAR scan against symlink cycles
(#67693)
233d3cbd625 is described below
commit 233d3cbd625a8870ca67d4ed9e5a63e7b0fcad91
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Sun May 31 16:55:23 2026 +0800
Guard JavaCoordinator JAR scan against symlink cycles (#67693)
* Guard JavaCoordinator JAR scan against symlink cycles
* Iterate JAR directories lazily and log Path directly
Drop the per-directory list() materialization in the JAR scan in favor of
a lazy iterdir() walk, keeping a small helper so an unreadable directory is
skipped instead of aborting the scan. Pass the Path straight to the debug
log rather than stringifying it, matching the other log calls.
---
.../airflow/sdk/coordinators/java/coordinator.py | 38 +++++++++++++--
.../task_sdk/coordinators/java/test_coordinator.py | 57 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 3 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py
b/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py
index 138c91cecf1..e91a26006c5 100644
--- a/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py
+++ b/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py
@@ -26,6 +26,7 @@ import pathlib
import selectors
import signal
import socket
+import stat
import subprocess
import time
import zipfile
@@ -62,13 +63,44 @@ def _start_server() -> socket.socket:
def _find_jars(items: Iterable[pathlib.Path]) -> Iterator[pathlib.Path]:
+ """
+ Yield JAR files under *items*, descending into directories.
+
+ A symlink loop or a directory that hardlinks into one of its ancestors
+ would otherwise recurse until the interpreter stack is exhausted, so
+ directories are deduplicated by ``(st_dev, st_ino)`` for the duration
+ of a single scan.
+ """
+ seen_dirs: set[tuple[int, int]] = set()
+ yield from _walk_jars(items, seen_dirs)
+
+
+def _walk_jars(items: Iterable[pathlib.Path], seen_dirs: set[tuple[int, int]])
-> Iterator[pathlib.Path]:
for item in items:
- if item.is_dir():
- yield from _find_jars(item.iterdir())
- elif item.is_file() and item.suffix == ".jar":
+ try:
+ st = item.stat()
+ except OSError:
+ continue
+ if stat.S_ISDIR(st.st_mode):
+ key = (st.st_dev, st.st_ino)
+ if key in seen_dirs:
+ log.debug("Skipping already-visited directory", path=item)
+ continue
+ seen_dirs.add(key)
+ yield from _walk_jars(_iter_dir(item), seen_dirs)
+ elif stat.S_ISREG(st.st_mode) and item.suffix == ".jar":
yield item
+def _iter_dir(directory: pathlib.Path) -> Iterator[pathlib.Path]:
+ # iterdir() is lazy, so an unreadable directory raises only once iteration
+ # starts; swallow it here so a single bad directory does not abort the
scan.
+ try:
+ yield from directory.iterdir()
+ except OSError:
+ return
+
+
def _calculate_classpath(jars_root: Sequence[pathlib.Path]) -> str:
jars = (p.as_posix() for p in _find_jars(jars_root))
return os.pathsep.join(sorted(jars)) # Keep output deterministic.
diff --git a/task-sdk/tests/task_sdk/coordinators/java/test_coordinator.py
b/task-sdk/tests/task_sdk/coordinators/java/test_coordinator.py
index 58014d76e32..25b9a323b5d 100644
--- a/task-sdk/tests/task_sdk/coordinators/java/test_coordinator.py
+++ b/task-sdk/tests/task_sdk/coordinators/java/test_coordinator.py
@@ -40,6 +40,7 @@ from airflow.sdk.coordinators.java.coordinator import (
_JavaActivitySubprocess,
_ResourceTracker,
_start_server,
+ _walk_jars,
)
from airflow.sdk.execution_time.coordinator import BaseCoordinator
from airflow.sdk.execution_time.supervisor import ActivitySubprocess
@@ -214,6 +215,62 @@ class TestMainJar:
with pytest.raises(FileNotFoundError, match="com.example.Missing"):
_JarInfo.find([tmp_path], "com.example.Missing")
+ def test_symlink_cycle_does_not_infinite_recurse(self, tmp_path):
+ nested = tmp_path / "inner"
+ nested.mkdir()
+ _make_jar(nested / "app.jar", main_class="com.example.Loop",
schema_version="2026-06-16")
+ loop = nested / "loop"
+ try:
+ loop.symlink_to(tmp_path)
+ except (OSError, NotImplementedError):
+ pytest.skip("symlinks not supported on this platform")
+
+ result = _JarInfo.find([tmp_path], "com.example.Loop")
+ assert result == _JarInfo("com.example.Loop", "2026-06-16")
+
+
+class TestWalkJars:
+ def test_skips_directory_whose_key_is_already_in_seen_dirs(self, tmp_path):
+ """A directory whose (st_dev, st_ino) is already in seen_dirs is
skipped."""
+ _make_jar(tmp_path / "app.jar", main_class="com.example.Main",
schema_version="2026-06-16")
+ st = tmp_path.stat()
+ seen_dirs: set[tuple[int, int]] = {(st.st_dev, st.st_ino)}
+ assert list(_walk_jars([tmp_path], seen_dirs)) == []
+
+ def test_records_visited_directories_in_seen_dirs(self, tmp_path):
+ """Every directory descended into is added to seen_dirs."""
+ sub = tmp_path / "sub"
+ sub.mkdir()
+ _make_jar(sub / "app.jar", main_class="com.example.Main",
schema_version="2026-06-16")
+ seen_dirs: set[tuple[int, int]] = set()
+ list(_walk_jars([tmp_path], seen_dirs))
+ assert (tmp_path.stat().st_dev, tmp_path.stat().st_ino) in seen_dirs
+ assert (sub.stat().st_dev, sub.stat().st_ino) in seen_dirs
+
+ def test_symlink_cycle_yields_each_jar_once(self, tmp_path):
+ """A symlink that loops back to an ancestor must not yield the same
JAR twice."""
+ nested = tmp_path / "inner"
+ nested.mkdir()
+ jar = _make_jar(nested / "app.jar", main_class="com.example.Loop",
schema_version="2026-06-16")
+ loop = nested / "loop"
+ try:
+ loop.symlink_to(tmp_path)
+ except (OSError, NotImplementedError):
+ pytest.skip("symlinks not supported on this platform")
+
+ seen_dirs: set[tuple[int, int]] = set()
+ yielded = list(_walk_jars([tmp_path], seen_dirs))
+ assert [p.resolve() for p in yielded] == [jar.resolve()]
+
+ def test_skip_logged_when_directory_revisited(self, tmp_path):
+ """A revisited directory triggers the 'Skipping already-visited
directory' debug log."""
+ sub = tmp_path / "sub"
+ sub.mkdir()
+ seen_dirs: set[tuple[int, int]] = {(sub.stat().st_dev,
sub.stat().st_ino)}
+ with patch("airflow.sdk.coordinators.java.coordinator.log") as
mock_log:
+ list(_walk_jars([sub], seen_dirs))
+ mock_log.debug.assert_any_call("Skipping already-visited directory",
path=sub)
+
class TestAcceptConnections:
def _connect_after_delay(self, addr: tuple[str, int], delay: float = 0.0)
-> None: