wzhero1 commented on code in PR #655:
URL: https://github.com/apache/flink-agents/pull/655#discussion_r3234514268


##########
api/src/main/java/org/apache/flink/agents/api/skills/Skills.java:
##########
@@ -51,33 +57,76 @@ public class Skills extends SerializableResource {
     /** Reserved name of the built-in bash tool used to execute skill scripts. 
*/
     public static final String BASH_TOOL = "bash";
 
-    private List<String> paths;
+    private final List<String> paths;
+    private final List<String> urls;
+    private final List<String> classpathResources;

Review Comment:
   **Adding a new source today forces 5 parallel changes** (`Skills` field + 
factory + `XxxSkillRepository` + `SkillManager` for-loop + `AgentPlan` merge 
branch).
   
   Suggest a discriminated union — single `sources: 
List<SkillSourceSpec(scheme, location)>` field + a `scheme → handler` registry. 
New source = one registration, no other file touched. Bonus: cross-language 
ser/de becomes uniform.
   
   Wire-format-affecting; much cheaper to fix before 0.3.0 ships.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/repository/ClasspathSkillRepository.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.skill.repository;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+/**
+ * Skill repository backed by a classpath resource — either a directory under 
{@code
+ * src/main/resources} (typical dev / Maven layout) or a path inside a JAR on 
the classpath (typical
+ * deployment).
+ *
+ * <p>For directory resources the underlying {@link FileSystemSkillRepository} 
uses the real on-disk
+ * path. For JAR-entry resources, the matching entries are extracted into a 
process-local temp
+ * directory (cleaned up at JVM exit).
+ *
+ * <p>When a {@link URLClassLoader} is used and the JAR does not contain an 
explicit directory entry
+ * for the resource prefix, this class falls back to scanning the 
classloader's URLs directly to
+ * synthesize the correct {@code jar:} URL.
+ */
+public class ClasspathSkillRepository extends FileSystemSkillRepository {
+
+    public ClasspathSkillRepository(String resource) throws IOException {
+        this(resource, Thread.currentThread().getContextClassLoader());
+    }
+
+    /**
+     * Constructor that accepts an explicit classloader. Useful in tests to 
inject a {@code
+     * URLClassLoader} pointing at a freshly-built jar.
+     */
+    public ClasspathSkillRepository(String resource, ClassLoader classLoader) 
throws IOException {
+        super(materialize(resource, classLoader));
+    }
+
+    private static Path materialize(String resource, ClassLoader classLoader) 
throws IOException {
+        URL url = classLoader.getResource(resource);
+        if (url == null) {
+            // Fallback for URLClassLoader: JARs without explicit directory 
entries will not return
+            // a URL for a prefix via getResource(). Scan the classloader's 
URLs directly to find
+            // any JAR entry that starts with the resource prefix.
+            url = findInUrlClassLoader(resource, classLoader);
+        }
+        if (url == null) {
+            throw new IllegalArgumentException("Classpath resource not found: 
" + resource);
+        }
+        String protocol = url.getProtocol();
+        if ("file".equals(protocol)) {
+            Path p;
+            try {
+                p = Paths.get(url.toURI());
+            } catch (URISyntaxException e) {
+                throw new IOException("Bad classpath URL: " + url, e);
+            }
+            if (Files.isDirectory(p)) {
+                return p;
+            }
+            if (Files.isRegularFile(p) && 
p.toString().toLowerCase().endsWith(".zip")) {
+                return SkillMaterializer.extractZipSafely(p);
+            }
+            throw new IllegalArgumentException(
+                    "Classpath resource must be a directory or a .zip: " + p);
+        }
+        if ("jar".equals(protocol)) {
+            return SkillMaterializer.extractClasspathFromJar(url, resource);
+        }
+        throw new IllegalArgumentException(

Review Comment:
   **⚠️ Flink fat-jar / nested-jar deployment is not covered.**
   
   Flink user JARs typically use Maven Shade or jar-in-jar; 
`getResource("skills")` returns protocols like `jar:nested:` that throw right 
here. `findInUrlClassLoader` also assumes `URLClassLoader`, not Flink's 
`ChildFirstClassLoader`.
   
   Issue #593 explicitly targets this scenario. Before merge: (1) an end-to-end 
test on a Flink mini-cluster, (2) use `ClassLoader.getResources()` + 
`getResourceAsStream` instead of dispatching on URL protocol.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/repository/URLSkillRepository.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.skill.repository;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+/**
+ * Skill repository backed by an http(s) URL pointing to a zip.
+ *
+ * <p>The zip is downloaded to a temp file and extracted into a process-local 
temp directory
+ * (cleaned up at JVM exit). The downloaded zip itself is removed once 
extraction completes.
+ */
+public class URLSkillRepository extends FileSystemSkillRepository {

Review Comment:
   **Inheritance is being used as composition.**
   
   A URL repo is not logically *a* FileSystem repo — it fetches over the 
network, then delegates. Side effects: the constructor performs network I/O 
before `super(...)`, the original `url` is lost for diagnostics, there's no 
`close()` lifecycle, and the fetch step can't be mocked alone.
   
   Suggest a `SkillSource { Path materialize() }` interface composed into one 
`MaterializingSkillRepository`. Same applies to `ClasspathSkillRepository` and 
`PackageSkillRepository`.



##########
api/src/main/java/org/apache/flink/agents/api/skills/Skills.java:
##########
@@ -51,33 +57,76 @@ public class Skills extends SerializableResource {
     /** Reserved name of the built-in bash tool used to execute skill scripts. 
*/
     public static final String BASH_TOOL = "bash";
 
-    private List<String> paths;
+    private final List<String> paths;

Review Comment:
   **Skill names are flat — no namespace, no version.** With this PR opening 
multiple loading sources, two teams' `github` skills will collide (last writer 
silently wins).
   
   Options: (1) `namespace:` in SKILL.md frontmatter → `team-a/github`, (2) 
per-source namespace `fromUrl(url, namespace="team-a")`, (3) source identity 
*is* the namespace. Wire-format-affecting — easier now than after 0.3.0.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillManager.java:
##########
@@ -134,15 +137,33 @@ public Path resolveResourcePath(String skillName, String 
resourcePath) {
         return null;
     }
 
-    private void loadFromPaths() {
+    private void loadAll() {
         for (String path : config.getPaths()) {
-            FileSystemSkillRepository repo = new 
FileSystemSkillRepository(path);
-            for (AgentSkill skill : repo.getSkills()) {
-                final String skillName = skill.getName();
-                skill.setResourceLoader(() -> repo.getResources(skillName));
-                skills.put(skillName, skill);
-                repos.put(skillName, repo);
+            registerRepo(new FileSystemSkillRepository(path));
+        }
+        for (String url : config.getUrls()) {
+            try {
+                registerRepo(new URLSkillRepository(url));
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to load skills from 
URL: " + url, e);
+            }
+        }
+        for (String resource : config.getClasspathResources()) {
+            try {
+                registerRepo(new ClasspathSkillRepository(resource));
+            } catch (IOException e) {
+                throw new IllegalStateException(
+                        "Failed to load skills from classpath: " + resource, 
e);
             }
         }
     }

Review Comment:
   **`AgentSkill` doesn't know where it came from.** Origin is captured only 
inside a lambda; `SkillManager` keeps a parallel `repos` map to compensate. 
Logs and errors can't name the source, the duplicate-name WARN can't either, 
and serialization loses origin entirely.
   
   Suggest a declarative `record SkillOrigin(String scheme, String location, 
Instant loadedAt)` field on `AgentSkill`.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/repository/SkillMaterializer.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.skill.repository;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+/**
+ * Internal helpers for materializing skill sources (zip files, URL downloads, 
classpath JAR
+ * entries) into a local temp directory. Cleanup is registered as a JVM 
shutdown hook so the temp
+ * directories are removed when the process exits.
+ */
+public final class SkillMaterializer {
+
+    private static final String TEMP_DIR_PREFIX = "flink-agents-skills-";
+
+    private SkillMaterializer() {}
+
+    /**
+     * Register a JVM shutdown hook that removes {@code path} recursively. 
Failures during deletion
+     * are silently ignored (best-effort cleanup).
+     */
+    public static void registerCleanup(Path path) {
+        Thread hook =
+                new Thread(() -> deleteRecursively(path), "skill-cleanup-" + 
path.getFileName());
+        Runtime.getRuntime().addShutdownHook(hook);
+    }
+
+    /**
+     * Extract a zip into a fresh temp directory and return that directory. 
Validates every entry
+     * against zip-slip (paths must resolve inside the extraction directory). 
Registers a JVM
+     * shutdown hook to remove the extraction directory at process exit.
+     *
+     * @throws IOException if any zip entry resolves outside the extraction 
directory.
+     */
+    public static Path extractZipSafely(Path zipPath) throws IOException {
+        Path extractDir = Files.createTempDirectory(TEMP_DIR_PREFIX);
+        // Register cleanup before validation so the empty tempdir is always 
reclaimed,
+        // even if validation raises.
+        registerCleanup(extractDir);
+        try (ZipFile zf = new ZipFile(zipPath.toFile())) {
+            // First pass: zip-slip validation.
+            Enumeration<? extends ZipEntry> entries = zf.entries();
+            while (entries.hasMoreElements()) {

Review Comment:
   **Two-pass scan can be merged into one** — validate-then-extract per entry 
in a single pass. Not a perf concern, just shorter.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillManager.java:
##########
@@ -134,15 +137,33 @@ public Path resolveResourcePath(String skillName, String 
resourcePath) {
         return null;
     }
 
-    private void loadFromPaths() {
+    private void loadAll() {
         for (String path : config.getPaths()) {
-            FileSystemSkillRepository repo = new 
FileSystemSkillRepository(path);
-            for (AgentSkill skill : repo.getSkills()) {
-                final String skillName = skill.getName();
-                skill.setResourceLoader(() -> repo.getResources(skillName));
-                skills.put(skillName, skill);
-                repos.put(skillName, repo);
+            registerRepo(new FileSystemSkillRepository(path));
+        }
+        for (String url : config.getUrls()) {
+            try {
+                registerRepo(new URLSkillRepository(url));
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to load skills from 
URL: " + url, e);
+            }
+        }
+        for (String resource : config.getClasspathResources()) {
+            try {
+                registerRepo(new ClasspathSkillRepository(resource));
+            } catch (IOException e) {
+                throw new IllegalStateException(
+                        "Failed to load skills from classpath: " + resource, 
e);
             }
         }
     }
+
+    private void registerRepo(SkillRepository repo) {

Review Comment:
   **Duplicate skill names overwrite silently with no log.** Multi-source name 
collisions go through `skills.put()` and the later registration wins — no WARN, 
no error. Users debugging "why doesn't my `SKILL.md` take effect" get no 
diagnostic.
   
   Minimum: WARN with both origins. Better: document the `paths → urls → 
classpath` precedence. (Naming the source concretely needs the `SkillOrigin` 
change on `AgentSkill`.)



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/repository/SkillMaterializer.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.skill.repository;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+/**
+ * Internal helpers for materializing skill sources (zip files, URL downloads, 
classpath JAR
+ * entries) into a local temp directory. Cleanup is registered as a JVM 
shutdown hook so the temp
+ * directories are removed when the process exits.
+ */
+public final class SkillMaterializer {
+
+    private static final String TEMP_DIR_PREFIX = "flink-agents-skills-";
+
+    private SkillMaterializer() {}
+
+    /**
+     * Register a JVM shutdown hook that removes {@code path} recursively. 
Failures during deletion
+     * are silently ignored (best-effort cleanup).
+     */
+    public static void registerCleanup(Path path) {
+        Thread hook =
+                new Thread(() -> deleteRecursively(path), "skill-cleanup-" + 
path.getFileName());
+        Runtime.getRuntime().addShutdownHook(hook);
+    }
+
+    /**
+     * Extract a zip into a fresh temp directory and return that directory. 
Validates every entry
+     * against zip-slip (paths must resolve inside the extraction directory). 
Registers a JVM
+     * shutdown hook to remove the extraction directory at process exit.
+     *
+     * @throws IOException if any zip entry resolves outside the extraction 
directory.
+     */
+    public static Path extractZipSafely(Path zipPath) throws IOException {
+        Path extractDir = Files.createTempDirectory(TEMP_DIR_PREFIX);
+        // Register cleanup before validation so the empty tempdir is always 
reclaimed,
+        // even if validation raises.
+        registerCleanup(extractDir);
+        try (ZipFile zf = new ZipFile(zipPath.toFile())) {
+            // First pass: zip-slip validation.
+            Enumeration<? extends ZipEntry> entries = zf.entries();
+            while (entries.hasMoreElements()) {
+                ZipEntry entry = entries.nextElement();
+                Path resolved = 
extractDir.resolve(entry.getName()).normalize();
+                if (!resolved.startsWith(extractDir)) {
+                    throw new IOException("Unsafe zip entry: " + 
entry.getName());
+                }
+            }
+            // Second pass: extract.
+            entries = zf.entries();
+            while (entries.hasMoreElements()) {
+                ZipEntry entry = entries.nextElement();
+                Path target = extractDir.resolve(entry.getName()).normalize();
+                if (entry.isDirectory()) {
+                    Files.createDirectories(target);
+                } else {
+                    Files.createDirectories(target.getParent());
+                    try (InputStream in = zf.getInputStream(entry)) {
+                        Files.copy(in, target);
+                    }
+                }
+            }
+        }
+        return extractDir;
+    }
+
+    /**
+     * Extract every JAR entry whose name starts with {@code resourcePrefix + 
"/"} into a fresh temp
+     * directory. The prefix itself is stripped (so entries under {@code 
skills/skill-a/...} extract
+     * as {@code skill-a/...}).
+     *
+     * <p>Registers a JVM shutdown hook for cleanup. Rejects entries that 
would resolve outside the
+     * extraction directory (zip-slip).
+     */
+    public static Path extractClasspathFromJar(URL jarUrl, String 
resourcePrefix)
+            throws IOException {
+        // Parse the JAR file URL from the jar: URL. The format is 
jar:<jar-file-url>!/[entry].
+        // We extract just the inner jar-file URL so we can open the whole 
JarFile and enumerate
+        // all entries — JarURLConnection.getJarFile() would fail when the 
entry specifier names a
+        // prefix that has no corresponding stored directory entry.
+        String spec = jarUrl.toString();
+        int sep = spec.indexOf("!/");
+        String innerSpec = sep >= 0 ? spec.substring(4, sep) : 
spec.substring(4);

Review Comment:
   **Magic number.** `4` is `"jar:".length()`. Extract `private static final 
int JAR_URL_PREFIX_LEN = "jar:".length();`.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/repository/ClasspathSkillRepository.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.skill.repository;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+/**
+ * Skill repository backed by a classpath resource — either a directory under 
{@code
+ * src/main/resources} (typical dev / Maven layout) or a path inside a JAR on 
the classpath (typical
+ * deployment).
+ *
+ * <p>For directory resources the underlying {@link FileSystemSkillRepository} 
uses the real on-disk
+ * path. For JAR-entry resources, the matching entries are extracted into a 
process-local temp
+ * directory (cleaned up at JVM exit).
+ *
+ * <p>When a {@link URLClassLoader} is used and the JAR does not contain an 
explicit directory entry
+ * for the resource prefix, this class falls back to scanning the 
classloader's URLs directly to
+ * synthesize the correct {@code jar:} URL.
+ */
+public class ClasspathSkillRepository extends FileSystemSkillRepository {
+
+    public ClasspathSkillRepository(String resource) throws IOException {
+        this(resource, Thread.currentThread().getContextClassLoader());
+    }
+
+    /**
+     * Constructor that accepts an explicit classloader. Useful in tests to 
inject a {@code
+     * URLClassLoader} pointing at a freshly-built jar.
+     */
+    public ClasspathSkillRepository(String resource, ClassLoader classLoader) 
throws IOException {
+        super(materialize(resource, classLoader));
+    }
+
+    private static Path materialize(String resource, ClassLoader classLoader) 
throws IOException {
+        URL url = classLoader.getResource(resource);
+        if (url == null) {
+            // Fallback for URLClassLoader: JARs without explicit directory 
entries will not return
+            // a URL for a prefix via getResource(). Scan the classloader's 
URLs directly to find
+            // any JAR entry that starts with the resource prefix.
+            url = findInUrlClassLoader(resource, classLoader);

Review Comment:
   **`findInUrlClassLoader` silently uses only the first matching jar.** 
Multiple plugin jars with the same prefix → every jar after the first is 
dropped, no log. Minimum: log the choice. Ideally merge across all matches, or 
fail loudly when more than one matches.
   
   Also assumes `URLClassLoader`, not Flink's `ChildFirstClassLoader` — see the 
nested-jar thread on `materialize`.



##########
python/flink_agents/api/skills.py:
##########
@@ -43,23 +59,56 @@ def my_skills() -> Skills:
 class Skills(SerializableResource):
     """A resource describing where to load agent skills from.
 
-    Use :meth:`from_local_dir` to construct — direct field construction is
-    reserved for internal serialization and not part of the public API.
+    Use one of the ``from_*`` factory methods to construct — direct field
+    construction is reserved for internal serialization and not part of the
+    public API.
     """
 
-    # Internal location list populated through the factory methods; kept
-    # as a public pydantic field so it can be serialized/deserialized.
+    # Filesystem paths. Each entry may be a directory whose immediate
+    # subdirectories each contain a ``SKILL.md`` file, or a ``.zip`` file
+    # whose top-level entries are the skill subdirectories.
     paths: List[str] = Field(default_factory=list)
 
+    # http(s) URLs. Each URL must point to a ``.zip`` whose top level is
+    # the baseDir.
+    urls: List[str] = Field(default_factory=list)
+
+    # (package_name, resource_path) pairs. Each entry references an installed
+    # Python package and a path inside it; the resource may be a directory or
+    # a ``.zip`` file.
+    packages: List[Tuple[str, str]] = Field(default_factory=list)
+
     @classmethod
     def from_local_dir(cls, *paths: str) -> Skills:
-        """Create a Skills resource from one or more local filesystem 
directories.
+        """Create a Skills resource from one or more local paths.
 
-        Each path points to a directory whose immediate subdirectories each
-        contain a ``SKILL.md`` file.
+        Each path may be a directory or a ``.zip`` file. For a directory, its
+        immediate subdirectories must each contain a ``SKILL.md`` file. For
+        a zip, its top-level entries are the skill subdirectories.
         """
         return cls(paths=list(paths))
 
+    @classmethod
+    def from_url(cls, *urls: str) -> Skills:
+        """Create a Skills resource from one or more http(s) URLs.
+
+        Each URL must point to a ``.zip`` whose top level is the baseDir
+        (i.e. skill subdirectories sit at the top of the zip).
+        """
+        return cls(urls=list(urls))
+
+    @classmethod
+    def from_package(cls, package: str, resource: str) -> Skills:

Review Comment:
   **API asymmetry.** Java `fromClasspath(String...)` is varargs; Python 
`from_package(package, resource)` is single-pair, so two packages require two 
`@skills` functions.
   
   Make it symmetric: `def from_package(cls, *pairs: Tuple[str, str])`. See 
also the cross-language drop thread on the `packages` field.



##########
plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareSkillsTest.java:
##########
@@ -126,6 +127,6 @@ void programmaticSkillsAddResourceParticipates() throws 
Exception {
                                 .provide(
                                         
org.apache.flink.agents.api.resource.ResourceContext
                                                 .fromGetResource((n, t) -> 
null));
-        assertEquals(java.util.List.of("/tmp/skill-d"), merged.getPaths());
+        assertEquals(List.of("/tmp/skill-d"), merged.getPaths());

Review Comment:
   **Merge-coverage gap.** `AgentPlan.addSkills` now merges three 
`LinkedHashSet`s but the test only asserts on `getPaths()`. Add cases for 
overlapping `urls`, overlapping `classpathResources`, and a mixed three-source 
case asserting all three end up in the merged result.



##########
python/flink_agents/api/skills.py:
##########
@@ -43,23 +59,56 @@ def my_skills() -> Skills:
 class Skills(SerializableResource):
     """A resource describing where to load agent skills from.
 
-    Use :meth:`from_local_dir` to construct — direct field construction is
-    reserved for internal serialization and not part of the public API.
+    Use one of the ``from_*`` factory methods to construct — direct field
+    construction is reserved for internal serialization and not part of the
+    public API.
     """
 
-    # Internal location list populated through the factory methods; kept
-    # as a public pydantic field so it can be serialized/deserialized.
+    # Filesystem paths. Each entry may be a directory whose immediate
+    # subdirectories each contain a ``SKILL.md`` file, or a ``.zip`` file
+    # whose top-level entries are the skill subdirectories.
     paths: List[str] = Field(default_factory=list)
 
+    # http(s) URLs. Each URL must point to a ``.zip`` whose top level is
+    # the baseDir.
+    urls: List[str] = Field(default_factory=list)

Review Comment:
   **Cross-language deserialization silently drops fields.** Python writes 
`packages` → Java drops it → 0 skills loaded, no warning. Mirror for Java 
`classpathResources` → Python.
   
   Short term: emit a WARN on unrecognized source fields during 
deserialization. Long term: unify under one cross-language scheme (see the 
`sources` discriminator suggested on `Skills.java`).



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/repository/URLSkillRepository.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.skill.repository;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+/**
+ * Skill repository backed by an http(s) URL pointing to a zip.
+ *
+ * <p>The zip is downloaded to a temp file and extracted into a process-local 
temp directory
+ * (cleaned up at JVM exit). The downloaded zip itself is removed once 
extraction completes.
+ */
+public class URLSkillRepository extends FileSystemSkillRepository {
+
+    private static final int REQUEST_TIMEOUT_MS = 90_000;
+
+    public URLSkillRepository(String url) throws IOException {
+        super(materialize(url));
+    }
+
+    private static Path materialize(String url) throws IOException {

Review Comment:
   **URL zip is loaded with no integrity verification** — vulnerable to 
truncated downloads, stale CDN cache, MITM with a private CA, and proxy 
rewrites.
   
   `Skills.fromUrl(url, "sha256:…")` with verification before extraction; 
mandatory in production. Same place to add an optional host allowlist for SSRF 
mitigation.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/repository/SkillMaterializer.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.skill.repository;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+/**
+ * Internal helpers for materializing skill sources (zip files, URL downloads, 
classpath JAR
+ * entries) into a local temp directory. Cleanup is registered as a JVM 
shutdown hook so the temp
+ * directories are removed when the process exits.
+ */
+public final class SkillMaterializer {
+
+    private static final String TEMP_DIR_PREFIX = "flink-agents-skills-";
+
+    private SkillMaterializer() {}
+
+    /**
+     * Register a JVM shutdown hook that removes {@code path} recursively. 
Failures during deletion
+     * are silently ignored (best-effort cleanup).
+     */
+    public static void registerCleanup(Path path) {
+        Thread hook =
+                new Thread(() -> deleteRecursively(path), "skill-cleanup-" + 
path.getFileName());
+        Runtime.getRuntime().addShutdownHook(hook);
+    }
+
+    /**
+     * Extract a zip into a fresh temp directory and return that directory. 
Validates every entry
+     * against zip-slip (paths must resolve inside the extraction directory). 
Registers a JVM
+     * shutdown hook to remove the extraction directory at process exit.
+     *
+     * @throws IOException if any zip entry resolves outside the extraction 
directory.
+     */
+    public static Path extractZipSafely(Path zipPath) throws IOException {

Review Comment:
   **Missing zip-bomb protection.** No cap on entry count, per-entry size, 
total size, or compression ratio — a small DEFLATE bomb can exhaust TaskManager 
disk/memory.
   
   Add limits (e.g. ≤ 50 MB per entry, ≤ 200 MB total, ≤ 10 000 entries, reject 
ratio > 100:1) and tests asserting the rejection.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillManager.java:
##########
@@ -44,7 +47,7 @@ public class SkillManager {
 
     public SkillManager(Skills config) {

Review Comment:
   **Constructor is synchronous, blocking, and not recoverable.** `loadAll()` 
fetches sources serially; one slow URL stalls startup, one failure kills the 
whole `SkillManager`, and `repos` is `final` so there's no hot reload.
   
   Follow-up: parallel materialization (`CompletableFuture`), fail-soft mode 
with a failure list, lazy load on first `getSkill()`, optional `reloadSource()` 
API.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/repository/SkillMaterializer.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.skill.repository;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+/**
+ * Internal helpers for materializing skill sources (zip files, URL downloads, 
classpath JAR
+ * entries) into a local temp directory. Cleanup is registered as a JVM 
shutdown hook so the temp
+ * directories are removed when the process exits.
+ */
+public final class SkillMaterializer {
+
+    private static final String TEMP_DIR_PREFIX = "flink-agents-skills-";
+
+    private SkillMaterializer() {}
+
+    /**
+     * Register a JVM shutdown hook that removes {@code path} recursively. 
Failures during deletion
+     * are silently ignored (best-effort cleanup).
+     */
+    public static void registerCleanup(Path path) {

Review Comment:
   **Shutdown hooks accumulate unboundedly.** Each zip / URL / classpath repo 
registers one hook; long-running processes that rebuild `SkillManager` 
(retries, hot reload) leak hooks monotonically.
   
   Fix: cache materializations by source digest (best — eliminates the leak 
entirely), or expose `close()` that calls `removeShutdownHook` + deletes 
immediately. At minimum javadoc that `SkillManager` must not be reconstructed 
in a loop.



##########
api/src/main/java/org/apache/flink/agents/api/skills/Skills.java:
##########
@@ -51,33 +57,76 @@ public class Skills extends SerializableResource {
     /** Reserved name of the built-in bash tool used to execute skill scripts. 
*/
     public static final String BASH_TOOL = "bash";
 
-    private List<String> paths;
+    private final List<String> paths;
+    private final List<String> urls;
+    private final List<String> classpathResources;
 
     /** Required by Jackson. */
     public Skills() {
         this.paths = Collections.emptyList();
+        this.urls = Collections.emptyList();
+        this.classpathResources = Collections.emptyList();
     }
 
     @JsonCreator
-    public Skills(@JsonProperty("paths") List<String> paths) {
+    public Skills(

Review Comment:
   **Source-incompatible constructor change.** Wire compat is fine; but `new 
Skills(List.of("/data/skills"))` no longer compiles. Either keep a 
`@Deprecated` 1-arg constructor that forwards, or call it out in the 0.3.0 
release notes.
   
   If `Skills` later migrates to a single `sources` field (see the 
discriminator thread), this signature changes again — better to break once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to