GitHub user yanand0909 created a discussion: [Feature] Dynamic Runtime Loading 
and Refresh of MCP Tools and Prompts

**GitHub Issue:** #458

## Motivation

### Current MCP Resource Discovery

The Flink Agents framework currently discovers MCP tools and prompts at 
**compile time** during `AgentPlan` construction. In 
`AgentPlan.extractJavaMCPServer()`, the framework:

1. Instantiates the MCP server via `JavaResourceProvider.provide()`
2. Calls `listTools()` and `listPrompts()` over the network
3. Serializes discovered resources as `JavaSerializableResourceProvider` 
instances
4. Closes the MCP server connection

```java
// Current behavior in AgentPlan.extractJavaMCPServer() 
(plan/src/.../AgentPlan.java:407)
JavaResourceProvider provider = new JavaResourceProvider(name, MCP_SERVER, 
descriptor);
addResourceProvider(provider);
Object mcpServer = provider.provide(null);  // Instantiates MCP server NOW

// Discovers tools over the network at compile time
Iterable<? extends SerializableResource> tools =
    (Iterable<? extends SerializableResource>) 
listToolsMethod.invoke(mcpServer);

// ... registers each tool as a static resource provider ...

closeMethod.invoke(mcpServer);  // Closes connection immediately
```

### The Problem

This compile-time discovery approach has several limitations:

1. **Build-time server dependency**: The MCP server must be reachable during 
`AgentPlan` construction. If the server is down, the build fails — even though 
the agent won't run until later.

2. **Static capabilities**: Tools and prompts are frozen at compile time. If an 
MCP server adds, removes, or updates its tools, the agent cannot see the 
changes without recompilation.

3. **No connection reuse**: The MCP server connection is opened for discovery 
and immediately closed. At runtime, a new connection must be established for 
tool calls, duplicating work.

4. **No graceful degradation**: A transient MCP server outage during 
compilation is a hard failure with no recovery path.

### Proposed Solution

We propose a two-part solution:

1. **Runtime Discovery** — Defer MCP tool/prompt discovery from compile-time to 
operator startup (`ActionExecutionOperator.open()`). This decouples the build 
from MCP server availability and enables connection reuse.

2. **Dynamic Refresh** — Allow MCP tools and prompts to be refreshed at 
runtime, so agents can adapt to changes in MCP server capabilities without 
redeployment.

---

## Public Interface

### MCPServer Builder Additions

New configuration options on the existing `MCPServer.Builder`:

```java
MCPServer server = MCPServer.builder("http://api.example.com/mcp";)
    .auth(new BearerTokenAuth(System.getenv("API_TOKEN")))
    .timeout(Duration.ofSeconds(30))
    // Startup behavior
    .failFastOnStartup(true)       // default: true — fail operator if server 
unreachable
    // Dynamic refresh configuration
    .autoRefresh(true)              // default: false — enable periodic refresh
    .refreshInterval(Duration.ofMinutes(5))  // default: 5 minutes
    .build();
```

### New Public API on MCPServer

```java
public class MCPServer extends Resource {
    RefreshResult refreshNow();

    // Automatic refresh lifecycle
    void startAutoRefresh();
    void stopAutoRefresh();
    boolean isAutoRefreshRunning();

    // Observability
    long getLastRefreshTime();
}

### Key Points

- The `@MCPServer` annotation API is **unchanged**. Existing user code compiles 
and runs without modification.
- Discovery moves transparently from plan compilation to operator startup.
- Dynamic refresh is **opt-in** via `autoRefresh(true)`. Without it, tools are 
discovered once at startup (current behavior, just deferred).
- `failFastOnStartup(true)` (default) preserves existing fail-fast semantics. 
Setting it to `false` enables graceful degradation — the operator starts 
without MCP tools if the server is unreachable.

---

## Implementation Details

### Part 1: Runtime Discovery

#### 1.1 New MCPServerResourceProvider

A new `ResourceProvider` that stores only MCP server configuration at compile 
time and defers instantiation and discovery to runtime.

**File:** 
`plan/src/main/java/.../resourceprovider/MCPServerResourceProvider.java` (NEW)

```java
public class MCPServerResourceProvider extends ResourceProvider {
    private final ResourceDescriptor descriptor;
    private transient MCPServer serverInstance;
    private transient Map<String, MCPTool> discoveredTools;
    private transient Map<String, MCPPrompt> discoveredPrompts;

    public MCPServerResourceProvider(String name, ResourceDescriptor 
descriptor) {
        super(name, ResourceType.MCP_SERVER);
        this.descriptor = descriptor;
    }

    @Override
    public Resource provide(BiFunction<String, ResourceType, Resource> 
getResource) {
        if (serverInstance == null) {
            serverInstance = createMCPServer();
        }
        return serverInstance;
    }

    public void discoverResources() throws Exception {
        if (serverInstance == null) { provide(null); }

        discoveredTools = new HashMap<>();
        for (MCPTool tool : serverInstance.listTools()) {
            discoveredTools.put(tool.getName(), tool);
        }

        discoveredPrompts = new HashMap<>();
        for (MCPPrompt prompt : serverInstance.listPrompts()) {
            discoveredPrompts.put(prompt.getName(), prompt);
        }
    }

    public void close() throws Exception {
        if (serverInstance != null) {
            serverInstance.close();
            serverInstance = null;
        }
    }
}
```

#### 1.2 Modify AgentPlan.extractJavaMCPServer()

**File:** `plan/src/main/java/.../AgentPlan.java` (lines 407-452)

Remove all MCP server instantiation, `listTools()`/`listPrompts()` calls, and 
`close()`. Replace with a simple provider registration:

```java
private void extractJavaMCPServer(Method method) throws Exception {
    String name = method.getName();
    ResourceDescriptor descriptor = (ResourceDescriptor) method.invoke(null);
    descriptor = new ResourceDescriptor(
        descriptor.getModule(),
        JAVA_MCP_SERVER_CLASS_NAME,
        new HashMap<>(descriptor.getInitialArguments()));

    // Store configuration only — no network calls
    MCPServerResourceProvider provider = new MCPServerResourceProvider(name, 
descriptor);
    addResourceProvider(provider);
}
```

**Impact:** `AgentPlan` construction drops from ~1-2s (MCP round-trip) to 
<10ms. Builds no longer depend on MCP server availability.

#### 1.3 Runtime Discovery in ActionExecutionOperator

**File:** `runtime/src/main/java/.../operator/ActionExecutionOperator.java`

Add MCP resource discovery during `open()` and cleanup during `close()`:

```java
@Override
public void open() throws Exception {
    super.open();
    // ... existing initialization ...
    initMCPResources();
}

private void initMCPResources() throws Exception {
    Map<String, ResourceProvider> mcpProviders =
        agentPlan.getResourceProviders().get(ResourceType.MCP_SERVER);
    if (mcpProviders == null) return;

    for (var entry : mcpProviders.entrySet()) {
        if (!(entry.getValue() instanceof MCPServerResourceProvider)) continue;
        MCPServerResourceProvider provider = (MCPServerResourceProvider) 
entry.getValue();

        try {
            provider.discoverResources();

            for (var tool : provider.getDiscoveredTools().entrySet()) {
                agentPlan.cacheResource(tool.getKey(), ResourceType.TOOL, 
tool.getValue());
            }
            for (var prompt : provider.getDiscoveredPrompts().entrySet()) {
                agentPlan.cacheResource(prompt.getKey(), ResourceType.PROMPT, 
prompt.getValue());
            }
        } catch (Exception e) {
            if (provider.isFailFastOnStartup()) {
                throw new RuntimeException(
                    "MCP discovery failed for: " + entry.getKey(), e);
            }
            LOG.warn("Continuing without MCP server '{}' — tools unavailable",
                entry.getKey());
        }
    }
}
```

#### 1.4 AgentPlan Cache Helper

**File:** `plan/src/main/java/.../AgentPlan.java`

```java
/** Cache a resource directly (used by runtime MCP discovery). */
public void cacheResource(String name, ResourceType type, Resource resource) {
    resourceCache
        .computeIfAbsent(type, k -> new ConcurrentHashMap<>())
        .put(name, resource);
}
```

#### 1.5 Serialization Support

**Files:** `ResourceProviderJsonSerializer.java`, 
`ResourceProviderJsonDeserializer.java`

Add serialization/deserialization cases for `MCPServerResourceProvider` that 
persist only the `ResourceDescriptor` (no runtime state):

```java
// Serializer
if (value instanceof MCPServerResourceProvider) {
    gen.writeStringField("type", "mcp_server");
    gen.writeFieldName("descriptor");
    serializerProvider.findValueSerializer(ResourceDescriptor.class)
        .serialize(provider.getDescriptor(), gen, serializerProvider);
}

// Deserializer
case "mcp_server":
    ResourceDescriptor desc = parser.readValueAs(ResourceDescriptor.class);
    return new MCPServerResourceProvider(name, desc);
```

---

### Part 2: Dynamic Refresh

Once runtime discovery is in place, dynamic refresh builds on the same 
infrastructure. We propose a phased approach with multiple strategies.

#### 2.1 Periodic Polling (Phase 1)

A background daemon thread periodically calls `refreshNow()`:

```java
public void startAutoRefresh() {
    if (!autoRefreshEnabled || refreshScheduler != null) return;

    refreshScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
        Thread t = new Thread(r, "MCP-Refresh-" + endpoint);
        t.setDaemon(true);
        return t;
    });

    refreshScheduler.scheduleAtFixedRate(
        () -> {
            try { refreshNow(); }
            catch (Exception e) { LOG.warn("Periodic refresh failed for {}", 
endpoint, e); }
        },
        refreshInterval.toMillis(),
        refreshInterval.toMillis(),
        TimeUnit.MILLISECONDS);
}
```

**Flink Integration:** `ActionExecutionOperator.open()` calls 
`startAutoRefresh()` on each MCP server after initial discovery. `close()` 
calls `stopAutoRefresh()`.

#### 2.3 Thread Safety

Tool reads are frequent (every tool call); refreshes are infrequent. We use 
`StampedLock` for optimal read-heavy concurrency:

```java
private final StampedLock lock = new StampedLock();

public MCPTool getTool(String name) {
    // Optimistic read — no lock contention on the happy path
    long stamp = lock.tryOptimisticRead();
    MCPTool tool = toolsCache.get(name);

    if (!lock.validate(stamp)) {
        // Fallback to pessimistic read
        stamp = lock.readLock();
        try {
            tool = toolsCache.get(name);
        } finally {
            lock.unlockRead(stamp);
        }
    }
    return tool;
}

public void updateTools(List<MCPTool> newTools) {
    long stamp = lock.writeLock();
    try {
        toolsCache.clear();
        newTools.forEach(t -> toolsCache.put(t.getName(), t));
    } finally {
        lock.unlockWrite(stamp);
    }
}
```

#### 2.4 Graceful Degradation

On refresh failures, the server falls back to the last known good state:

```java
private Map<String, MCPTool> fallbackTools;  // Last successful discovery
private int consecutiveRefreshFailures = 0;

private void handleRefreshFailure(Exception e) {
    consecutiveRefreshFailures++;
    if (consecutiveRefreshFailures >= maxConsecutiveFailures) {
        connectionState = ConnectionState.FAILED;
        // Restore fallback
        toolsCache.clear();
        toolsCache.putAll(fallbackTools);
    } else {
        connectionState = ConnectionState.DEGRADED;
        // Keep using current cache
    }
}
```

#### 2.5 Operator-Level vs Task-Manager-Level Refresh

| Approach | Threads | Isolation | Complexity |
|----------|---------|-----------|------------|
| **Operator-level** (recommended) | One per operator per MCP server | Full 
isolation between operators | Simple |
| Task-manager-level | One shared thread | Shared state across operators | 
Requires global registry |

We recommend **operator-level refresh** for simplicity and isolation. Each 
`ActionExecutionOperator` manages the refresh lifecycle of its own MCP servers.

---

## Future Work

### Event-Driven Refresh

If MCP server notifications become part of the MCP specification, subscribe to 
server-sent events for near-instant tool updates:

```java
MCPServer server = MCPServer.builder("http://api.example.com/mcp";)
    .eventDrivenRefresh(true)  // Subscribe to SSE notifications
    .build();

server.addChangeListener(type -> {
    LOG.info("MCP resources updated: {}", type);
});
```

This would eliminate polling overhead entirely but requires MCP server-side 
support. The implementation should fall back to polling if the SSE connection 
drops.

### TTL-Based Lazy Refresh

An alternative cache strategy where tools are refreshed lazily on access after 
a configurable TTL expires. Good for development environments with sporadic 
tool usage — avoids background threads while still ensuring freshness.

### Advanced Resilience

- **Circuit breaker pattern** for failing MCP servers (prevent retry storms)
- **Tool versioning and schema validation** (detect incompatible tool signature 
changes)
- **Metrics export** via Flink's metrics system for Prometheus/Grafana 
dashboards

### Python MCP Server Support

Extend runtime discovery to Python MCP servers using the existing Python client.

## Other Agents approach

- Lanchain uses polling approach for dynamically loading tools and prompts.
- Claude desktop or Claude code uses Server sent notifications to refresh the 
list of available tools

GitHub link: https://github.com/apache/flink-agents/discussions/543

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to