This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 9e4aca8dbc [Metrics] Add a few WebSocket metrics
9e4aca8dbc is described below
commit 9e4aca8dbc7f472183f7358ba9e0b8509c836a9f
Author: Quan Tran <[email protected]>
AuthorDate: Thu Oct 31 14:16:00 2024 +0700
[Metrics] Add a few WebSocket metrics
- open connections count
- requests count
---
.../src/main/java/org/apache/james/metrics/api/Metric.java | 9 +++++++++
.../apache/james/metrics/dropwizard/DropWizardMetric.java | 5 +++++
.../org/apache/james/jmap/routes/WebSocketRoutes.scala | 14 ++++++++++----
3 files changed, 24 insertions(+), 4 deletions(-)
diff --git
a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java
b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java
index 87efdd9843..131fc6bc2f 100644
--- a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java
+++ b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java
@@ -46,4 +46,13 @@ public interface Metric {
default double movingAverage() {
return Long.valueOf(getCount()).doubleValue();
}
+
+ /**
+ * Mean rate of the events happen in one second.
+ *
+ * Default to count (naive implementation with period starting at boot
time)
+ */
+ default double meanRate() {
+ return Long.valueOf(getCount()).doubleValue();
+ }
}
diff --git
a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetric.java
b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetric.java
index 02d6fa55ad..fc271917c7 100644
---
a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetric.java
+++
b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetric.java
@@ -72,4 +72,9 @@ public class DropWizardMetric implements Metric {
public double movingAverage() {
return meter.getFiveMinuteRate();
}
+
+ @Override
+ public double meanRate() {
+ return meter.getMeanRate();
+ }
}
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index f596508897..a861a99be1 100644
---
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -41,6 +41,7 @@ import org.apache.james.jmap.http.{Authenticator,
UserProvisioning}
import org.apache.james.jmap.json.{PushSerializer, ResponseSerializer}
import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys
=> JMAPInjectionKeys}
import org.apache.james.mailbox.MailboxSession
+import org.apache.james.metrics.api.{Metric, MetricFactory}
import org.apache.james.user.api.DelegationStore
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json.Json
@@ -79,7 +80,10 @@ class WebSocketRoutes @Inject()
(@Named(InjectionKeys.RFC_8621) val authenticato
emailChangeRepository: EmailChangeRepository,
pushSerializer: PushSerializer,
typeStateFactory: TypeStateFactory,
- delegationStore: DelegationStore) extends
JMAPRoutes {
+ delegationStore: DelegationStore,
+ metricFactory: MetricFactory) extends
JMAPRoutes {
+ private val openingConnectionsMetric: Metric =
metricFactory.generate("jmap_websocket_opening_connections_count")
+ private val requestCountMetric: Metric =
metricFactory.generate("jmap_websocket_requests_count")
override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
JMAPRoute.builder
@@ -91,16 +95,17 @@ class WebSocketRoutes @Inject()
(@Named(InjectionKeys.RFC_8621) val authenticato
.action(JMAPRoutes.CORS_CONTROL)
.corsHeaders())
- private def handleWebSockets(httpServerRequest: HttpServerRequest,
httpServerResponse: HttpServerResponse): Mono[Void] = {
+ private def handleWebSockets(httpServerRequest: HttpServerRequest,
httpServerResponse: HttpServerResponse): Mono[Void] =
SMono(authenticator.authenticate(httpServerRequest))
.flatMap((mailboxSession: MailboxSession) =>
userProvisioner.provisionUser(mailboxSession)
.`then`
.`then`(SMono(httpServerResponse.addHeader(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL,
"jmap")
- .sendWebsocket((in, out) =>
handleWebSocketConnection(mailboxSession)(in, out)))))
+ .sendWebsocket((in, out) =>
handleWebSocketConnection(mailboxSession)(in, out)))
+ .doOnSubscribe(_ => openingConnectionsMetric.increment())
+ .doOnTerminate(() => openingConnectionsMetric.decrement())))
.onErrorResume(throwable => handleHttpHandshakeError(throwable,
httpServerResponse))
.asJava()
.`then`()
- }
private def handleWebSocketConnection(session: MailboxSession)(in:
WebsocketInbound, out: WebsocketOutbound): Mono[Void] = {
val sink: Sinks.Many[OutboundMessage] =
Sinks.many().unicast().onBackpressureBuffer()
@@ -113,6 +118,7 @@ class WebSocketRoutes @Inject()
(@Named(InjectionKeys.RFC_8621) val authenticato
frame.content().readBytes(bytes)
new String(bytes, StandardCharsets.UTF_8)
})
+ .doOnNext(_ => requestCountMetric.increment())
.flatMap(message => handleClientMessages(context)(message))
.doOnTerminate(context.clean)
.doOnCancel(context.clean)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]