This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 57afe79251 [FIX] Correct websocket metrics
57afe79251 is described below
commit 57afe7925131b84d7e9327f8c641121d98a34689
Author: Benoit TELLIER <[email protected]>
AuthorDate: Mon Dec 9 18:19:37 2024 +0100
[FIX] Correct websocket metrics
Count of connection was incremented and decremented as part
of the initial HTTP exchange.
This is wrong, it needs to be managed in the socket phase.
---
.../apache/james/jmap/routes/WebSocketRoutes.scala | 19 ++++++++++++-------
1 file changed, 12 insertions(+), 7 deletions(-)
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index 6cd72f359c..1406fe87a3 100644
---
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -103,15 +103,14 @@ class WebSocketRoutes @Inject()
(@Named(InjectionKeys.RFC_8621) val authenticato
.flatMap((mailboxSession: MailboxSession) =>
userProvisioner.provisionUser(mailboxSession)
.`then`
.`then`(SMono(httpServerResponse.addHeader(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL,
"jmap")
- .sendWebsocket((in, out) =>
handleWebSocketConnection(mailboxSession)(in, out)))
- .doOnSubscribe(_ => openingConnectionsMetric.increment())
- .doOnTerminate(() => openingConnectionsMetric.decrement())))
+ .sendWebsocket((in, out) =>
handleWebSocketConnection(mailboxSession)(in, out)))))
.onErrorResume(throwable => handleHttpHandshakeError(throwable,
httpServerResponse))
.asJava()
.`then`()
private def handleWebSocketConnection(session: MailboxSession)(in:
WebsocketInbound, out: WebsocketOutbound): Mono[Void] = {
val sink: Sinks.Many[OutboundMessage] =
Sinks.many().unicast().onBackpressureBuffer()
+ openingConnectionsMetric.increment()
val context = ClientContext(sink, new AtomicReference[Registration](),
session)
val responseFlux: SFlux[OutboundMessage] =
SFlux[WebSocketFrame](in.aggregateFrames()
@@ -124,10 +123,16 @@ class WebSocketRoutes @Inject()
(@Named(InjectionKeys.RFC_8621) val authenticato
.doOnNext(_ => connectedUsers.put(context, context))
.doOnNext(_ => requestCountMetric.increment())
.flatMap(message => handleClientMessages(context)(message))
- .doOnTerminate(context.clean)
- .doOnCancel(context.clean)
- .doOnTerminate(() => connectedUsers.remove(context))
- .doOnCancel(() => connectedUsers.remove(context))
+ .doOnTerminate(() => {
+ context.clean()
+ connectedUsers.remove(context)
+ openingConnectionsMetric.decrement()
+ })
+ .doOnCancel(() => {
+ context.clean()
+ connectedUsers.remove(context)
+ openingConnectionsMetric.decrement()
+ })
out.sendString(
SFlux.merge(Seq(responseFlux, sink.asFlux()))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]