Copilot commented on code in PR #13551:
URL: https://github.com/apache/apisix/pull/13551#discussion_r3410979846
##########
apisix/plugins/mcp/server_wrapper.lua:
##########
@@ -33,6 +33,10 @@ local function sse_handler(conf, ctx, opts)
local server = opts.server
+ -- mark the session as live before advertising its endpoint, so a message
+ -- sent right after the client learns the session id is accepted
+ server:register()
+
-- send endpoint event to advertise the message endpoint
Review Comment:
`server:register()` is called unconditionally and its result is ignored. If
registration fails (e.g., shared dict out of memory), the code still advertises
the endpoint and proceeds. Also, if `on_connect` returns an error, the early
return path does not call `server:close()`, so the session marker/queue may be
left behind until TTL expiry, and `/message` can accept requests for a non-live
session in the meantime.
Check the result of `register()` and ensure `server:close()` runs before
returning when `on_connect` fails.
##########
apisix/plugins/mcp/server.lua:
##########
@@ -89,6 +104,9 @@ function _M.start(self)
self.need_exit = true
break
end
+ -- refresh the session marker so it stays alive as long as the
+ -- connection does, and expires shortly after it goes away
+ self.message_broker:register()
ngx_sleep(30)
Review Comment:
The ping loop refreshes the live-session marker via
`self.message_broker:register()`, but ignores its return value. If the shared
dict is full or `set()` fails for any reason, the marker can expire while the
SSE connection is still active, causing `/message` to start returning 404 for a
live session.
Handle register failures by logging and terminating the session (or
otherwise ensuring the marker remains consistent with the connection state).
##########
apisix/plugins/mcp/broker/shared_dict.lua:
##########
@@ -45,14 +54,52 @@ function _M.on(self, event, cb)
end
+-- record that this session has a live SSE connection on some worker. the
+-- message endpoint consults this before queueing, so that a request can only
+-- enqueue work for a session that actually exists. refreshed periodically by
+-- the owning session and removed on teardown; the TTL is a backstop for an
+-- unclean teardown.
+function _M.register(self)
+ local ok, err = shared_dict:set(self.session_id .. STORAGE_SUFFIX_SESSION,
+ true, SESSION_TTL)
+ if not ok then
+ return nil, "failed to register session: " .. err
+ end
+ return true
+end
+
+
+function _M.unregister(self)
+ shared_dict:delete(self.session_id .. STORAGE_SUFFIX_SESSION)
+ shared_dict:delete(self.session_id .. STORAGE_SUFFIX_QUEUE)
+end
+
+
+-- whether a session currently has a live SSE connection. module-level by
+-- design: the message endpoint checks this before creating a server.
+function _M.session_exists(session_id)
+ if not session_id then
+ return false
+ end
+ return shared_dict:get(session_id .. STORAGE_SUFFIX_SESSION) ~= nil
+end
+
+
function _M.push(self, message)
if not message then
return nil, "message is nil"
end
- local ok, err = shared_dict:rpush(self.session_id .. STORAGE_SUFFIX_QUEUE,
message)
+ local key = self.session_id .. STORAGE_SUFFIX_QUEUE
+ local len = shared_dict:llen(key)
+ if len and len >= QUEUE_MAX_LENGTH then
+ return nil, "queue is full"
+ end
+ local ok, err = shared_dict:rpush(key, message)
if not ok then
return nil, "failed to push message to queue: " .. err
end
+ -- keep the queue from outliving its session if teardown is missed
+ shared_dict:expire(key, SESSION_TTL)
return true
Review Comment:
`push()` uses a separate `llen()` check before `rpush()` to enforce
`QUEUE_MAX_LENGTH`. This is not atomic: concurrent requests can pass the length
check and collectively grow the queue beyond the cap.
Consider using the length returned by `rpush()` to detect overflow and roll
back the last push (or otherwise enforce the cap atomically) so the bound holds
under concurrency.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]