GitHub user yanand0909 created a discussion: [Feature] Dynamic Runtime Loading
and Refresh of MCP Tools and Prompts
**GitHub Issue:** #458
## Motivation
### Current MCP Resource Discovery
The Flink Agents framework currently discovers MCP tools and prompts at
**compile time** during `AgentPlan` construction. In
`AgentPlan.extractJavaMCPServer()`, the framework:
1. Instantiates the MCP server via `JavaResourceProvider.provide()`
2. Calls `listTools()` and `listPrompts()` over the network
3. Serializes discovered resources as `JavaSerializableResourceProvider`
instances
4. Closes the MCP server connection
```java
// Current behavior in AgentPlan.extractJavaMCPServer()
(plan/src/.../AgentPlan.java:407)
JavaResourceProvider provider = new JavaResourceProvider(name, MCP_SERVER,
descriptor);
addResourceProvider(provider);
Object mcpServer = provider.provide(null); // Instantiates MCP server NOW
// Discovers tools over the network at compile time
Iterable<? extends SerializableResource> tools =
(Iterable<? extends SerializableResource>)
listToolsMethod.invoke(mcpServer);
// ... registers each tool as a static resource provider ...
closeMethod.invoke(mcpServer); // Closes connection immediately
```
### The Problem
This compile-time discovery approach has several limitations:
1. **Build-time server dependency**: The MCP server must be reachable during
`AgentPlan` construction. If the server is down, the build fails — even though
the agent won't run until later.
2. **Static capabilities**: Tools and prompts are frozen at compile time. If an
MCP server adds, removes, or updates its tools, the agent cannot see the
changes without recompilation.
3. **No connection reuse**: The MCP server connection is opened for discovery
and immediately closed. At runtime, a new connection must be established for
tool calls, duplicating work.
4. **No graceful degradation**: A transient MCP server outage during
compilation is a hard failure with no recovery path.
### Proposed Solution
We propose a two-part solution:
1. **Runtime Discovery** — Defer MCP tool/prompt discovery from compile-time to
operator startup (`ActionExecutionOperator.open()`). This decouples the build
from MCP server availability and enables connection reuse.
2. **Dynamic Refresh** — Allow MCP tools and prompts to be refreshed at
runtime, so agents can adapt to changes in MCP server capabilities without
redeployment.
---
## Public Interface
### MCPServer Builder Additions
New configuration options on the existing `MCPServer.Builder`:
```java
MCPServer server = MCPServer.builder("http://api.example.com/mcp")
.auth(new BearerTokenAuth(System.getenv("API_TOKEN")))
.timeout(Duration.ofSeconds(30))
// Startup behavior
.failFastOnStartup(true) // default: true — fail operator if server
unreachable
// Dynamic refresh configuration
.autoRefresh(true) // default: false — enable periodic refresh
.refreshInterval(Duration.ofMinutes(5)) // default: 5 minutes
.build();
```
### New Public API on MCPServer
```java
public class MCPServer extends Resource {
RefreshResult refreshNow();
// Automatic refresh lifecycle
void startAutoRefresh();
void stopAutoRefresh();
boolean isAutoRefreshRunning();
// Observability
long getLastRefreshTime();
}
### Key Points
- The `@MCPServer` annotation API is **unchanged**. Existing user code compiles
and runs without modification.
- Discovery moves transparently from plan compilation to operator startup.
- Dynamic refresh is **opt-in** via `autoRefresh(true)`. Without it, tools are
discovered once at startup (current behavior, just deferred).
- `failFastOnStartup(true)` (default) preserves existing fail-fast semantics.
Setting it to `false` enables graceful degradation — the operator starts
without MCP tools if the server is unreachable.
---
## Implementation Details
### Part 1: Runtime Discovery
#### 1.1 New MCPServerResourceProvider
A new `ResourceProvider` that stores only MCP server configuration at compile
time and defers instantiation and discovery to runtime.
**File:**
`plan/src/main/java/.../resourceprovider/MCPServerResourceProvider.java` (NEW)
```java
public class MCPServerResourceProvider extends ResourceProvider {
private final ResourceDescriptor descriptor;
private transient MCPServer serverInstance;
private transient Map<String, MCPTool> discoveredTools;
private transient Map<String, MCPPrompt> discoveredPrompts;
public MCPServerResourceProvider(String name, ResourceDescriptor
descriptor) {
super(name, ResourceType.MCP_SERVER);
this.descriptor = descriptor;
}
@Override
public Resource provide(BiFunction<String, ResourceType, Resource>
getResource) {
if (serverInstance == null) {
serverInstance = createMCPServer();
}
return serverInstance;
}
public void discoverResources() throws Exception {
if (serverInstance == null) { provide(null); }
discoveredTools = new HashMap<>();
for (MCPTool tool : serverInstance.listTools()) {
discoveredTools.put(tool.getName(), tool);
}
discoveredPrompts = new HashMap<>();
for (MCPPrompt prompt : serverInstance.listPrompts()) {
discoveredPrompts.put(prompt.getName(), prompt);
}
}
public void close() throws Exception {
if (serverInstance != null) {
serverInstance.close();
serverInstance = null;
}
}
}
```
#### 1.2 Modify AgentPlan.extractJavaMCPServer()
**File:** `plan/src/main/java/.../AgentPlan.java` (lines 407-452)
Remove all MCP server instantiation, `listTools()`/`listPrompts()` calls, and
`close()`. Replace with a simple provider registration:
```java
private void extractJavaMCPServer(Method method) throws Exception {
String name = method.getName();
ResourceDescriptor descriptor = (ResourceDescriptor) method.invoke(null);
descriptor = new ResourceDescriptor(
descriptor.getModule(),
JAVA_MCP_SERVER_CLASS_NAME,
new HashMap<>(descriptor.getInitialArguments()));
// Store configuration only — no network calls
MCPServerResourceProvider provider = new MCPServerResourceProvider(name,
descriptor);
addResourceProvider(provider);
}
```
**Impact:** `AgentPlan` construction drops from ~1-2s (MCP round-trip) to
<10ms. Builds no longer depend on MCP server availability.
#### 1.3 Runtime Discovery in ActionExecutionOperator
**File:** `runtime/src/main/java/.../operator/ActionExecutionOperator.java`
Add MCP resource discovery during `open()` and cleanup during `close()`:
```java
@Override
public void open() throws Exception {
super.open();
// ... existing initialization ...
initMCPResources();
}
private void initMCPResources() throws Exception {
Map<String, ResourceProvider> mcpProviders =
agentPlan.getResourceProviders().get(ResourceType.MCP_SERVER);
if (mcpProviders == null) return;
for (var entry : mcpProviders.entrySet()) {
if (!(entry.getValue() instanceof MCPServerResourceProvider)) continue;
MCPServerResourceProvider provider = (MCPServerResourceProvider)
entry.getValue();
try {
provider.discoverResources();
for (var tool : provider.getDiscoveredTools().entrySet()) {
agentPlan.cacheResource(tool.getKey(), ResourceType.TOOL,
tool.getValue());
}
for (var prompt : provider.getDiscoveredPrompts().entrySet()) {
agentPlan.cacheResource(prompt.getKey(), ResourceType.PROMPT,
prompt.getValue());
}
} catch (Exception e) {
if (provider.isFailFastOnStartup()) {
throw new RuntimeException(
"MCP discovery failed for: " + entry.getKey(), e);
}
LOG.warn("Continuing without MCP server '{}' — tools unavailable",
entry.getKey());
}
}
}
```
#### 1.4 AgentPlan Cache Helper
**File:** `plan/src/main/java/.../AgentPlan.java`
```java
/** Cache a resource directly (used by runtime MCP discovery). */
public void cacheResource(String name, ResourceType type, Resource resource) {
resourceCache
.computeIfAbsent(type, k -> new ConcurrentHashMap<>())
.put(name, resource);
}
```
#### 1.5 Serialization Support
**Files:** `ResourceProviderJsonSerializer.java`,
`ResourceProviderJsonDeserializer.java`
Add serialization/deserialization cases for `MCPServerResourceProvider` that
persist only the `ResourceDescriptor` (no runtime state):
```java
// Serializer
if (value instanceof MCPServerResourceProvider) {
gen.writeStringField("type", "mcp_server");
gen.writeFieldName("descriptor");
serializerProvider.findValueSerializer(ResourceDescriptor.class)
.serialize(provider.getDescriptor(), gen, serializerProvider);
}
// Deserializer
case "mcp_server":
ResourceDescriptor desc = parser.readValueAs(ResourceDescriptor.class);
return new MCPServerResourceProvider(name, desc);
```
---
### Part 2: Dynamic Refresh
Once runtime discovery is in place, dynamic refresh builds on the same
infrastructure. We propose a phased approach with multiple strategies.
#### 2.1 Periodic Polling (Phase 1)
A background daemon thread periodically calls `refreshNow()`:
```java
public void startAutoRefresh() {
if (!autoRefreshEnabled || refreshScheduler != null) return;
refreshScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "MCP-Refresh-" + endpoint);
t.setDaemon(true);
return t;
});
refreshScheduler.scheduleAtFixedRate(
() -> {
try { refreshNow(); }
catch (Exception e) { LOG.warn("Periodic refresh failed for {}",
endpoint, e); }
},
refreshInterval.toMillis(),
refreshInterval.toMillis(),
TimeUnit.MILLISECONDS);
}
```
**Flink Integration:** `ActionExecutionOperator.open()` calls
`startAutoRefresh()` on each MCP server after initial discovery. `close()`
calls `stopAutoRefresh()`.
#### 2.3 Thread Safety
Tool reads are frequent (every tool call); refreshes are infrequent. We use
`StampedLock` for optimal read-heavy concurrency:
```java
private final StampedLock lock = new StampedLock();
public MCPTool getTool(String name) {
// Optimistic read — no lock contention on the happy path
long stamp = lock.tryOptimisticRead();
MCPTool tool = toolsCache.get(name);
if (!lock.validate(stamp)) {
// Fallback to pessimistic read
stamp = lock.readLock();
try {
tool = toolsCache.get(name);
} finally {
lock.unlockRead(stamp);
}
}
return tool;
}
public void updateTools(List<MCPTool> newTools) {
long stamp = lock.writeLock();
try {
toolsCache.clear();
newTools.forEach(t -> toolsCache.put(t.getName(), t));
} finally {
lock.unlockWrite(stamp);
}
}
```
#### 2.4 Graceful Degradation
On refresh failures, the server falls back to the last known good state:
```java
private Map<String, MCPTool> fallbackTools; // Last successful discovery
private int consecutiveRefreshFailures = 0;
private void handleRefreshFailure(Exception e) {
consecutiveRefreshFailures++;
if (consecutiveRefreshFailures >= maxConsecutiveFailures) {
connectionState = ConnectionState.FAILED;
// Restore fallback
toolsCache.clear();
toolsCache.putAll(fallbackTools);
} else {
connectionState = ConnectionState.DEGRADED;
// Keep using current cache
}
}
```
#### 2.5 Operator-Level vs Task-Manager-Level Refresh
| Approach | Threads | Isolation | Complexity |
|----------|---------|-----------|------------|
| **Operator-level** (recommended) | One per operator per MCP server | Full
isolation between operators | Simple |
| Task-manager-level | One shared thread | Shared state across operators |
Requires global registry |
We recommend **operator-level refresh** for simplicity and isolation. Each
`ActionExecutionOperator` manages the refresh lifecycle of its own MCP servers.
---
## Future Work
### Event-Driven Refresh
If MCP server notifications become part of the MCP specification, subscribe to
server-sent events for near-instant tool updates:
```java
MCPServer server = MCPServer.builder("http://api.example.com/mcp")
.eventDrivenRefresh(true) // Subscribe to SSE notifications
.build();
server.addChangeListener(type -> {
LOG.info("MCP resources updated: {}", type);
});
```
This would eliminate polling overhead entirely but requires MCP server-side
support. The implementation should fall back to polling if the SSE connection
drops.
### TTL-Based Lazy Refresh
An alternative cache strategy where tools are refreshed lazily on access after
a configurable TTL expires. Good for development environments with sporadic
tool usage — avoids background threads while still ensuring freshness.
### Advanced Resilience
- **Circuit breaker pattern** for failing MCP servers (prevent retry storms)
- **Tool versioning and schema validation** (detect incompatible tool signature
changes)
- **Metrics export** via Flink's metrics system for Prometheus/Grafana
dashboards
### Python MCP Server Support
Extend runtime discovery to Python MCP servers using the existing Python client.
## Other Agents approach
- Lanchain uses polling approach for dynamically loading tools and prompts.
- Claude desktop or Claude code uses Server sent notifications to refresh the
list of available tools
GitHub link: https://github.com/apache/flink-agents/discussions/543
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]