wzhero1 commented on code in PR #655: URL: https://github.com/apache/flink-agents/pull/655#discussion_r3266018155
########## python/flink_agents/runtime/skill/repository/skill_directory_reader.py: ########## @@ -0,0 +1,137 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +"""Read-only accessor for an on-disk directory of skills. + +Composed by :class:`SkillRepository` implementations to handle the +"parse SKILL.md under base_dir" half of their work, leaving each repo +free to manage its own materialization and ``close()`` story. +""" + +from __future__ import annotations + +import logging +import os +from pathlib import Path +from typing import TYPE_CHECKING, Dict, List + +from flink_agents.runtime.skill.skill_parser import SkillParser + +if TYPE_CHECKING: + from flink_agents.runtime.skill.agent_skill import AgentSkill + +logger = logging.getLogger(__name__) + + +class SkillDirectoryReader: + """Reads skills from an already-materialized directory. No lifecycle.""" + + SKILL_MD_FILE = "SKILL.md" + + def __init__(self, base_dir: Path) -> None: + """Wrap ``base_dir`` (must be an existing directory).""" + if base_dir is None: + msg = "Base directory cannot be None" + raise ValueError(msg) + resolved = Path(base_dir).resolve() + if not resolved.exists(): + msg = f"Path does not exist: {resolved}" + raise ValueError(msg) + if not resolved.is_dir(): + msg = f"Path must be a directory: {resolved}" + raise ValueError(msg) + self._base_dir = resolved + + @property + def base_dir(self) -> Path: + """Absolute base directory.""" + return self._base_dir + + def get_skill_dir(self, name: str) -> Path: + """``base_dir / name``; existence not checked.""" + return self._base_dir / name + + def get_skill(self, name: str) -> AgentSkill | None: + """Parse ``base_dir/name/SKILL.md``; ``None`` if absent.""" + skill_dir = self._base_dir / name + skill_md_path = skill_dir / self.SKILL_MD_FILE + if not skill_md_path.exists(): + return None + return self._load_skill(skill_dir) + + def get_skills(self) -> List[AgentSkill]: + """All skills under ``base_dir``, sorted by name.""" + skills: List[AgentSkill] = [] + for skill_name in self._list_skill_names(): + skill = self.get_skill(skill_name) + if skill is not None: + skills.append(skill) + return skills + + def get_resources(self, name: str) -> Dict[str, str]: + """All non-``SKILL.md`` files under the named skill, keyed by relative path.""" + skill_dir = self._base_dir / name + if not skill_dir.is_dir(): + return {} + return self._load_resources(skill_dir) + + def _list_skill_names(self) -> List[str]: + return sorted( + [ + entry.name + for entry in self._base_dir.iterdir() + if entry.is_dir() and (entry / self.SKILL_MD_FILE).exists() + ] + ) + + def _load_skill(self, skill_dir: Path) -> AgentSkill | None: + skill_md_path = skill_dir / self.SKILL_MD_FILE + if not skill_md_path.exists(): + return None + try: + content = skill_md_path.read_text() + skill = SkillParser.parse_skill(content) + if skill.name != skill_dir.name: + logger.warning( + "The skill name %s is different from the base directory %s.", + skill.name, + skill_dir.name, + ) + except Exception as e: + err_msg = f"Failed to load skill from {skill_dir}" + raise ValueError(err_msg) from e + else: + return skill + + def _load_resources(self, skill_dir: Path) -> Dict[str, str]: + resources: Dict[str, str] = {} + for root, _dirs, files in os.walk(skill_dir): + root_path = Path(root) + for file_name in files: + if file_name == self.SKILL_MD_FILE: + continue + file_path = root_path / file_name + relative_path = str(file_path.relative_to(skill_dir)) + try: + resources[relative_path] = file_path.read_text() + except UnicodeDecodeError: + resources[relative_path] = f"base64: {file_path.read_bytes()}" Review Comment: **This branch is not base64-encoding — binary resources are silently corrupted.** ```python resources[relative_path] = f"base64: {file_path.read_bytes()}" ``` The f-string returns the `bytes` repr (literal `"base64: b'\\x89PNG...'"`), not encoded data. `base64.b64decode(value.removeprefix("base64:"))` will raise on any real payload. Java sibling uses `Base64.getEncoder().encodeToString(...)`, so the cross-language contract diverges silently and every binary resource read via `AgentSkill.get_resource()` on Python is unusable. Fix: `"base64:" + base64.b64encode(file_path.read_bytes()).decode("ascii")`. Worth adding a PNG round-trip test through both languages — none exists today. ########## python/flink_agents/runtime/skill/skill_manager.py: ########## @@ -93,42 +108,91 @@ def get_skill_dirs(self, *names: str) -> List[str]: """Return absolute directory paths for the given skill names. If no names are provided, returns directories for all filesystem-backed - skills. Unknown names and skills not backed by a filesystem repo are - silently skipped. + skills. Skills not backed by a filesystem repo are silently skipped. + + Raises: + ValueError: If any provided name is not a registered skill. """ selected = names if names else tuple(self._repos.keys()) dirs: List[str] = [] for skill_name in selected: repo = self._repos.get(skill_name) - if isinstance(repo, FileSystemSkillRepository): - dirs.append(str(repo.base_dir / skill_name)) + if repo is None: + msg = ( + f"Skill {skill_name} not found, " + f"available skill names are: {list(self._repos.keys())}" + ) + raise ValueError(msg) + dir_path = repo.get_skill_dir(skill_name) + if dir_path is not None: + dirs.append(str(dir_path)) return dirs def get_skill_dir(self, skill_name: str) -> Path | None: """Return absolute directory path for a single skill, if filesystem-backed.""" repo = self._repos.get(skill_name) - if isinstance(repo, FileSystemSkillRepository): - return repo.base_dir / skill_name - return None + return None if repo is None else repo.get_skill_dir(skill_name) def resolve_resource_path(self, skill_name: str, resource_path: str) -> Path | None: """Resolve a skill resource's relative path to an absolute filesystem path. Returns None if the skill's repository doesn't support path resolution. """ repo = self._repos.get(skill_name) - if isinstance(repo, FileSystemSkillRepository): - resolved = repo.base_dir / skill_name / resource_path - if resolved.exists() and resolved.is_file(): - return resolved - return None - - def _load_skills_from_paths(self) -> None: - for path in self._config.paths: - repo = FileSystemSkillRepository(path) - for skill in repo.get_skills(): - skill.set_resource_loader( - lambda name=skill.name, r=repo: r.get_resources(name) + if repo is None: + return None + dir_path = repo.get_skill_dir(skill_name) + if dir_path is None: + return None + resolved = dir_path / resource_path + return resolved if resolved.is_file() else None + + def _load_skills(self) -> None: + for spec in self._config.sources: + try: + handler = skill_source_registry.get(spec.scheme) + repo = handler.open(spec.params) + except (OSError, ValueError) as e: + msg = ( + f"Failed to load skills from {spec.scheme}:{spec.params}" ) - self._skills[skill.name] = skill - self._repos[skill.name] = repo + raise RuntimeError(msg) from e + self._register_repo(repo, _origin_of(spec)) Review Comment: **Partial-load leak: a failed source leaves earlier-opened repos uncleaned.** Java's `loadAll()` now wraps each `open()` in try/catch and closes already-opened repos on failure (round-2 follow-up). Python here still just re-raises — if source #3 of 5 fails, sources #1 and #2 keep their extracted temp dirs and `atexit` handlers for the interpreter's lifetime. In a long-lived TaskManager Python worker every failed reload leaks. Same try/finally + first-exception pattern as the Java side: collect opened repos in a list, close them on the failure path before re-raising. ########## runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillManager.java: ########## @@ -111,38 +148,113 @@ public List<String> getSkillDirs(List<String> names) { @Nullable public Path getSkillDir(String skillName) { SkillRepository repo = repos.get(skillName); - if (repo instanceof FileSystemSkillRepository) { - return ((FileSystemSkillRepository) repo).getBaseDir().resolve(skillName); - } - return null; + return repo == null ? null : repo.getSkillDir(skillName); } /** Resolve a skill resource's relative path to an absolute path, or {@code null} if missing. */ @Nullable public Path resolveResourcePath(String skillName, String resourcePath) { SkillRepository repo = repos.get(skillName); - if (repo instanceof FileSystemSkillRepository) { - Path resolved = - ((FileSystemSkillRepository) repo) - .getBaseDir() - .resolve(skillName) - .resolve(resourcePath); - if (Files.isRegularFile(resolved)) { - return resolved; + if (repo == null) { + return null; + } + Path dir = repo.getSkillDir(skillName); + if (dir == null) { + return null; + } + Path resolved = dir.resolve(resourcePath); + return Files.isRegularFile(resolved) ? resolved : null; + } + + private void loadAll() { + for (SkillSourceSpec spec : config.getSources()) { + try { + SkillRepository repo = + SkillSourceRegistry.get(spec.getScheme()) + .open(spec.getParams(), classLoader); + registerRepo(repo, originOf(spec)); + } catch (IOException | IllegalArgumentException e) { + IllegalStateException toThrow = + new IllegalStateException( + "Failed to load skills from " + + spec.getScheme() + + ":" + + spec.getParams(), + e); + // Release repos registered before this point. The caller never receives a + // SkillManager reference (we're throwing from the constructor path), so + // without this cleanup their shutdown hooks + temp dirs would leak until + // JVM exit. + try { + closeRepos(); + } catch (Exception cleanupError) { + toThrow.addSuppressed(cleanupError); + } + throw toThrow; + } + } + } + + /** + * Build a {@link SkillOrigin} from a spec for diagnostics (WARN on duplicates, etc.). The + * location description is delegated to the handler registered for the scheme — see {@link + * SkillSourceHandler#describeLocation(Map)} — so adding a new scheme is a single registry call. + */ + private static SkillOrigin originOf(SkillSourceSpec spec) { + SkillSourceHandler handler = SkillSourceRegistry.get(spec.getScheme()); + return new SkillOrigin(spec.getScheme(), handler.describeLocation(spec.getParams())); + } + + /** + * Close every owned {@link SkillRepository}, releasing any temp directories materialized for + * URL / classpath-zip / classpath-jar sources. Idempotent (delegated repos use {@link + * java.util.concurrent.atomic.AtomicBoolean} guards). + * + * <p>Mirrors {@code ResourceCache.close()}: the first failure is rethrown after every repo has + * been attempted, with subsequent failures attached as suppressed exceptions. This surfaces + * real shutdown bugs (locked files, permission denied, disk full) instead of silently + * swallowing them. + */ + @Override + public void close() throws Exception { + closeRepos(); + } + + private void closeRepos() throws Exception { + // repos may map multiple skill names to the same repo instance; dedup by identity. + Set<SkillRepository> unique = Collections.newSetFromMap(new IdentityHashMap<>()); + unique.addAll(repos.values()); + Exception firstException = null; + for (SkillRepository repo : unique) { + try { + repo.close(); + } catch (Exception e) { + if (firstException == null) { + firstException = e; + } else { + firstException.addSuppressed(e); + } } } - return null; + if (firstException != null) { + throw firstException; + } } - private void loadFromPaths() { - for (String path : config.getPaths()) { - FileSystemSkillRepository repo = new FileSystemSkillRepository(path); - for (AgentSkill skill : repo.getSkills()) { - final String skillName = skill.getName(); - skill.setResourceLoader(() -> repo.getResources(skillName)); - skills.put(skillName, skill); - repos.put(skillName, repo); + private void registerRepo(SkillRepository repo, SkillOrigin origin) { + for (AgentSkill skill : repo.getSkills()) { + final String skillName = skill.getName(); + skill.setResourceLoader(() -> repo.getResources(skillName)); + skill.setOrigin(origin); + AgentSkill previous = skills.put(skillName, skill); + if (previous != null) { + LOG.warn( + "Skill '{}' from {} overrides earlier registration from {}", + skillName, + origin, + previous.getOrigin() == null ? "<unknown>" : previous.getOrigin()); } + repos.put(skillName, repo); Review Comment: **Duplicate skill name overwrites the repo-index entry; the displaced repo is orphaned from `closeRepos()`. Bug exists in both languages.** ```java repos.put(skillName, repo); // overwrites previous repo ref ``` `closeRepos()` iterates `repos.values()`, so when two sources contribute skill `"foo"` the displaced repo is no longer reachable — its temp dir survives until JVM exit, only the shutdown hook ever fires. The recently-added close-cascade plumbing can't help because the orphan is never visited. Python mirrors the same shape at `python/.../skill_manager.py:177`. Fix: track opened repos in a separate `List<SkillRepository>` and iterate *that* in `closeRepos()` (dedup with `IdentityHashMap`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
