This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 48d1361172 JAMES-3491 - JMAP WebSockets - support ping interval (#2561)
48d1361172 is described below
commit 48d1361172e9fe58bb2bba281a3f018e205554c4
Author: vttran <[email protected]>
AuthorDate: Wed Dec 18 09:14:50 2024 +0700
JAMES-3491 - JMAP WebSockets - support ping interval (#2561)
---
docs/modules/servers/partials/configure/jmap.adoc | 4 +
.../apache/james/modules/TestJMAPServerModule.java | 40 +++-
.../WebSocketWithPingIntervalContract.scala | 227 +++++++++++++++++++++
.../MemoryWebSocketWithPingIntervalTest.java | 65 ++++++
.../james/jmap/core/JmapRfc8621Configuration.scala | 14 +-
.../apache/james/jmap/routes/WebSocketRoutes.scala | 34 +--
src/site/xdoc/server/config-jmap.xml | 5 +
7 files changed, 374 insertions(+), 15 deletions(-)
diff --git a/docs/modules/servers/partials/configure/jmap.adoc
b/docs/modules/servers/partials/configure/jmap.adoc
index c5f697c53f..b2f0419844 100644
--- a/docs/modules/servers/partials/configure/jmap.adoc
+++ b/docs/modules/servers/partials/configure/jmap.adoc
@@ -33,6 +33,10 @@ Defaults to an empty list.
| websocket.url.prefix
| Optional. URL for JMAP WebSocket route. Default value: ws://localhost
+| websocket.ping.interval
+| Optional. Configure the duration of the interval between consecutive ping
messages (as specified in RFC6455) sent by the server to the client over a
WebSocket connection.
+The supported unit is seconds (e.g: `3s` for a 3-second interval). Default is
empty, this feature is disabled.
+
| email.send.max.size
| Optional. Configuration max size for message created in RFC-8621.
Default value: None. Supported units are B (bytes) K (KB) M (MB) G (GB).
diff --git
a/server/container/guice/protocols/jmap/src/test/java/org/apache/james/modules/TestJMAPServerModule.java
b/server/container/guice/protocols/jmap/src/test/java/org/apache/james/modules/TestJMAPServerModule.java
index 7b70263afc..8e27983cbb 100644
---
a/server/container/guice/protocols/jmap/src/test/java/org/apache/james/modules/TestJMAPServerModule.java
+++
b/server/container/guice/protocols/jmap/src/test/java/org/apache/james/modules/TestJMAPServerModule.java
@@ -19,23 +19,51 @@
package org.apache.james.modules;
+import static
org.apache.james.jmap.core.JmapRfc8621Configuration.LOCALHOST_CONFIGURATION;
+
import java.io.FileNotFoundException;
+import java.util.Map;
import java.util.Optional;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
+import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.james.jmap.JMAPConfiguration;
+import org.apache.james.jmap.core.JmapRfc8621Configuration;
import org.apache.james.jwt.JwtConfiguration;
import org.apache.james.jwt.JwtTokenVerifier;
import org.apache.james.modules.mailbox.FastRetryBackoffModule;
+import org.apache.james.utils.PropertiesProvider;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
public class TestJMAPServerModule extends AbstractModule {
+ static class JmapRfc8621ConfigurationOverrideModule extends AbstractModule
{
+ private final Map<String, Object> overrideJmapProperties;
+
+ JmapRfc8621ConfigurationOverrideModule(Map<String, Object>
overrideJmapProperties) {
+ this.overrideJmapProperties = overrideJmapProperties;
+ }
+
+ @Provides
+ @Singleton
+ JmapRfc8621Configuration provideConfiguration(PropertiesProvider
propertiesProvider) throws ConfigurationException {
+ try {
+ Configuration configuration =
propertiesProvider.getConfiguration("jmap");
+ overrideJmapProperties.forEach(configuration::setProperty);
+ return JmapRfc8621Configuration.from(configuration);
+ } catch (FileNotFoundException e) {
+ return LOCALHOST_CONFIGURATION();
+ }
+ }
+ }
+
+ private final Map<String, Object> overrideJmapProperties;
private static final String PUBLIC_PEM_KEY =
"-----BEGIN PUBLIC KEY-----\n" +
@@ -80,15 +108,25 @@ public class TestJMAPServerModule extends AbstractModule {
"ICQil1aaN7/2au+p7E4n7nzfYG7nRX5syDoqgBbdhpJxV8/5ohA=\n" +
"-----END RSA PRIVATE KEY-----\n";
+ public TestJMAPServerModule(Map<String, Object> overrideJmapProperties) {
+ this.overrideJmapProperties = overrideJmapProperties;
+ }
+
+ public TestJMAPServerModule() {
+ this(ImmutableMap.of());
+ }
@Override
protected void configure() {
install(new FastRetryBackoffModule());
+ if (!overrideJmapProperties.isEmpty()) {
+ install(new
JmapRfc8621ConfigurationOverrideModule(overrideJmapProperties));
+ }
}
@Provides
@Singleton
- JMAPConfiguration provideConfiguration() throws FileNotFoundException,
ConfigurationException {
+ JMAPConfiguration provideConfiguration() {
return JMAPConfiguration.builder()
.enable()
.randomPort()
diff --git
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketWithPingIntervalContract.scala
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketWithPingIntervalContract.scala
new file mode 100644
index 0000000000..7e199ca6fa
--- /dev/null
+++
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketWithPingIntervalContract.scala
@@ -0,0 +1,227 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.contract
+
+import java.net.URI
+import java.util.UUID
+
+import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
+import okhttp3.OkHttpClient
+import org.apache.james.GuiceJamesServer
+import org.apache.james.jmap.JmapGuiceProbe
+import org.apache.james.jmap.api.change.State
+import org.apache.james.jmap.api.model.AccountId
+import org.apache.james.jmap.core.{PushState, UuidState}
+import org.apache.james.jmap.rfc8621.contract.Fixture._
+import org.apache.james.mailbox.model.MailboxPath
+import org.apache.james.modules.MailboxProbeImpl
+import org.apache.james.utils.DataProbeImpl
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{AfterEach, Test, Timeout}
+import sttp.capabilities.WebSockets
+import sttp.client3.monad.IdMonad
+import sttp.client3.okhttp.OkHttpSyncBackend
+import sttp.client3.{Identity, RequestT, SttpBackend, asWebSocket,
basicRequest}
+import sttp.model.Uri
+import sttp.monad.MonadError
+import sttp.ws.WebSocketFrame.Text
+import sttp.ws.{WebSocket, WebSocketFrame}
+
+import scala.jdk.CollectionConverters._
+
+trait WebSocketWithPingIntervalContract {
+ private lazy val backend: SttpBackend[Identity, WebSockets] =
OkHttpSyncBackend()
+ private lazy implicit val monadError: MonadError[Identity] = IdMonad
+
+ def startJmapServer(overrideJmapProperties: Map[String, Object]):
GuiceJamesServer
+
+ def stopJmapServer(): Unit
+
+ @AfterEach
+ def afterEach(): Unit = {
+ stopJmapServer()
+ }
+
+ private def setUpJmapServer(overrideJmapProperties: Map[String, Object] =
Map.empty): GuiceJamesServer = {
+ val server = startJmapServer(overrideJmapProperties)
+ server.getProbe(classOf[DataProbeImpl])
+ .fluent()
+ .addDomain(DOMAIN.asString())
+ .addUser(ANDRE.asString(), ANDRE_PASSWORD)
+ .addUser(BOB.asString(), BOB_PASSWORD)
+ .addUser(DAVID.asString(), "secret")
+ server
+ }
+
+ @Test
+ @Timeout(180)
+ def apiRequestsShouldBeProcessedWhenClientPingInterval(): Unit = {
+ val server = setUpJmapServer()
+ // Given client sends PING frame interval 2s
+ val intervalDurationInMillis = 2000
+ val backend: SttpBackend[Identity, WebSockets] =
OkHttpSyncBackend.usingClient(new OkHttpClient.Builder()
+ .pingInterval(intervalDurationInMillis,
java.util.concurrent.TimeUnit.MILLISECONDS)
+ .build())
+
+ // The websocket connection is keep alive during client ping interval
+ val response: Either[String, WebSocketFrame] = authenticatedRequest(server)
+ .response(asWebSocket[Identity, WebSocketFrame] { ws:
WebSocket[Identity] =>
+ sendEchoTextFrame(ws)
+ Thread.sleep(intervalDurationInMillis * 3)
+ ws.receive()
+ })
+ .send(backend)
+ .body
+
+ val responseAsFrame = response.toOption.get
+ assertThat(responseAsFrame).isInstanceOf(classOf[WebSocketFrame.Text])
+ assertThatJson(responseAsFrame.asPayload)
+ .isEqualTo(
+ """{
+ | "@type":"Response",
+ | "requestId":"req-36",
+ | "sessionState":"2c9f1b12-b35a-43e6-9af2-0106fb53a943",
+ | "methodResponses":[
+ | ["Core/echo",
+ | {
+ | "arg1":"arg1data",
+ | "arg2":"arg2data"
+ | },"c1"]
+ | ]
+ |}
+ |""".stripMargin)
+ }
+
+ @Test
+ @Timeout(180)
+ def apiRequestsShouldBeProcessedWhenConfigurePingIntervalResponse(): Unit = {
+ // Given a server with configured ping interval of 2s
+ val server = setUpJmapServer(Map("websocket.ping.interval" -> "2s"))
+
+ val requestId1 = UUID.randomUUID().toString
+ val requestId2 = UUID.randomUUID().toString
+ val response: Either[String, List[WebSocketFrame]] =
+ authenticatedRequest(server)
+ .response(asWebSocket[Identity, List[WebSocketFrame]] {
+ ws =>
+ sendEchoTextFrame(ws, requestId1)
+ Thread.sleep(2000)
+ val frame1 = ws.receive()
+ sendEchoTextFrame(ws, requestId2)
+ Thread.sleep(2000)
+ val frame2 = ws.receive()
+ List(frame1, frame2)
+ })
+ .send(backend)
+ .body
+
+ val listResponseFrame =
response.toOption.get.map(_.asInstanceOf[Text]).map(_.payload)
+ assertThat(listResponseFrame.asJava).hasSize(2)
+ assertThat(listResponseFrame.filter(frame =>
frame.contains(requestId1)).asJava).hasSize(1)
+ assertThat(listResponseFrame.filter(frame =>
frame.contains(requestId2)).asJava).hasSize(1)
+ }
+
+ @Test
+ @Timeout(180)
+ def pushEnableRequestsShouldBeProcessedWhenConfigurePingIntervalResponse():
Unit = {
+ val server = setUpJmapServer(Map("websocket.ping.interval" -> "2s"))
+
+ val bobPath = MailboxPath.inbox(BOB)
+ val accountId: AccountId = AccountId.fromUsername(BOB)
+ val mailboxId =
server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
+
+ val response: Either[String, List[String]] =
+ authenticatedRequest(server)
+ .response(asWebSocket[Identity, List[String]] {
+ ws =>
+ ws.send(WebSocketFrame.text(
+ """{
+ | "@type": "WebSocketPushEnable",
+ | "dataTypes": ["Mailbox", "Email"]
+ |}""".stripMargin))
+
+ Thread.sleep(100)
+
+ ws.send(WebSocketFrame.text(
+ s"""{
+ | "@type": "Request",
+ | "id": "req-36",
+ | "using": ["urn:ietf:params:jmap:core",
"urn:ietf:params:jmap:mail"],
+ | "methodCalls": [
+ | ["Email/set", {
+ | "accountId": "$ACCOUNT_ID",
+ | "create": {
+ | "aaaaaa":{
+ | "mailboxIds": {
+ | "${mailboxId.serialize}": true
+ | }
+ | }
+ | }
+ | }, "c1"]]
+ |}""".stripMargin))
+
+ List(ws.receive().asPayload,
+ ws.receive().asPayload)
+ })
+ .send(backend)
+ .body
+
+ Thread.sleep(100)
+
+ val jmapGuiceProbe: JmapGuiceProbe =
server.getProbe(classOf[JmapGuiceProbe])
+ val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
+ val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
+
+ val globalState: String =
PushState.fromOption(Some(UuidState.fromJava(mailboxState)),
Some(UuidState.fromJava(emailState))).get.value
+ val stateChange: String =
s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
+
+ assertThat(response.toOption.get.asJava)
+ .hasSize(2)
+ .contains(stateChange)
+ }
+
+ private def authenticatedRequest(server: GuiceJamesServer):
RequestT[Identity, Either[String, String], Any] = {
+ val port = server.getProbe(classOf[JmapGuiceProbe])
+ .getJmapPort
+ .getValue
+ basicRequest.get(Uri.apply(new URI(s"ws://127.0.0.1:$port/jmap/ws")))
+ .header("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+ .header("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+ }
+
+ private def sendEchoTextFrame(ws: WebSocket[Identity], requestId: String =
"req-36"): Identity[Unit] = {
+ ws.send(WebSocketFrame.text(
+ s"""{
+ | "@type": "Request",
+ | "id": "$requestId",
+ | "using": [ "urn:ietf:params:jmap:core"],
+ | "methodCalls": [
+ | [
+ | "Core/echo",
+ | {
+ | "arg1": "arg1data",
+ | "arg2": "arg2data"
+ | },
+ | "c1"
+ | ]
+ | ]
+ |}""".stripMargin))
+ }
+}
diff --git
a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketWithPingIntervalTest.java
b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketWithPingIntervalTest.java
new file mode 100644
index 0000000000..df7fa61580
--- /dev/null
+++
b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketWithPingIntervalTest.java
@@ -0,0 +1,65 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.memory;
+
+import static
org.apache.james.data.UsersRepositoryModuleChooser.Implementation.DEFAULT;
+
+import java.io.File;
+
+import org.apache.james.GuiceJamesServer;
+import org.apache.james.MemoryJamesConfiguration;
+import org.apache.james.MemoryJamesServerMain;
+import org.apache.james.jmap.rfc8621.contract.IdentityProbeModule;
+import
org.apache.james.jmap.rfc8621.contract.WebSocketWithPingIntervalContract;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.junit.jupiter.api.io.TempDir;
+
+import com.github.fge.lambdas.Throwing;
+
+import scala.collection.immutable.Map;
+import scala.jdk.javaapi.CollectionConverters;
+
+public class MemoryWebSocketWithPingIntervalTest implements
WebSocketWithPingIntervalContract {
+ @TempDir
+ private File tmpDir;
+
+ private GuiceJamesServer guiceJamesServer;
+
+ @Override
+ public GuiceJamesServer startJmapServer(Map<String, Object>
overrideJmapProperties) {
+ guiceJamesServer =
MemoryJamesServerMain.createServer(MemoryJamesConfiguration.builder()
+ .workingDirectory(tmpDir)
+ .configurationFromClasspath()
+ .usersRepository(DEFAULT)
+ .enableJMAP()
+ .build())
+ .overrideWith(new
TestJMAPServerModule(CollectionConverters.asJava(overrideJmapProperties)), new
DelegationProbeModule(), new IdentityProbeModule());
+ Throwing.runnable(() -> guiceJamesServer.start()).run();
+ return guiceJamesServer;
+ }
+
+ @Override
+ public void stopJmapServer() {
+ if (guiceJamesServer != null && guiceJamesServer.isStarted()) {
+ guiceJamesServer.stop();
+ }
+ }
+}
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala
index 0c66fb16e7..c9f5098aa7 100644
---
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala
@@ -20,16 +20,20 @@
package org.apache.james.jmap.core
import java.net.URI
+import java.time.temporal.ChronoUnit
import java.util.Optional
+import com.google.common.base.Preconditions
import com.google.common.collect.ImmutableList
import org.apache.commons.configuration2.Configuration
import org.apache.james.jmap.core.CapabilityIdentifier.CapabilityIdentifier
import
org.apache.james.jmap.core.JmapRfc8621Configuration.{JMAP_EMAIL_GET_FULL_MAX_SIZE_DEFAULT,
JMAP_MAX_OBJECT_IN_GET, JMAP_MAX_OBJECT_IN_SET,
JMAP_UPLOAD_QUOTA_LIMIT_DEFAULT, MAX_SIZE_ATTACHMENTS_PER_MAIL_DEFAULT,
UPLOAD_LIMIT_DEFAULT}
import org.apache.james.jmap.pushsubscription.PushClientConfiguration
-import org.apache.james.util.Size
+import org.apache.james.util.{DurationParser, Size}
+import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
import scala.jdk.OptionConverters._
object JmapConfigProperties {
@@ -37,6 +41,7 @@ object JmapConfigProperties {
val MAX_SIZE_ATTACHMENTS_PER_MAIL_PROPERTY: String =
"max.size.attachments.per.mail"
val URL_PREFIX_PROPERTY: String = "url.prefix"
val WEBSOCKET_URL_PREFIX_PROPERTY: String = "websocket.url.prefix"
+ val WEBSOCKET_PING_INTERVAL_PROPERTY: String = "websocket.ping.interval"
val WEB_PUSH_MAX_TIMEOUT_SECONDS_PROPERTY: String =
"webpush.maxTimeoutSeconds"
val WEB_PUSH_MAX_CONNECTIONS_PROPERTY: String = "webpush.maxConnections"
val WEB_PUSH_PREVENT_SERVER_SIDE_REQUEST_FORGERY: String =
"webpush.prevent.server.side.request.forgery"
@@ -70,6 +75,12 @@ object JmapRfc8621Configuration {
JmapRfc8621Configuration(
urlPrefixString =
Option(configuration.getString(URL_PREFIX_PROPERTY)).getOrElse(URL_PREFIX_DEFAULT),
websocketPrefixString =
Option(configuration.getString(WEBSOCKET_URL_PREFIX_PROPERTY)).getOrElse(WEBSOCKET_URL_PREFIX_DEFAULT),
+ websocketPingInterval =
Option(configuration.getString(WEBSOCKET_PING_INTERVAL_PROPERTY))
+ .map(DurationParser.parse(_, ChronoUnit.SECONDS))
+ .map(duration => {
+ Preconditions.checkArgument(!duration.isZero &&
!duration.isNegative, s"`$WEBSOCKET_PING_INTERVAL_PROPERTY` must be
positive".asInstanceOf[Object])
+ duration.toScala
+ }),
dynamicJmapPrefixResolutionEnabled =
configuration.getBoolean(DYNAMIC_JMAP_PREFIX_RESOLUTION_ENABLED_PROPERTY,
false),
supportsDelaySends = configuration.getBoolean(DELAY_SENDS_ENABLED,
false),
maxUploadSize = Option(configuration.getString(UPLOAD_LIMIT_PROPERTY,
null))
@@ -107,6 +118,7 @@ object JmapRfc8621Configuration {
case class JmapRfc8621Configuration(urlPrefixString: String,
websocketPrefixString: String,
+ websocketPingInterval: Option[Duration] =
None,
dynamicJmapPrefixResolutionEnabled:
Boolean = false,
supportsDelaySends: Boolean = false,
maxUploadSize: MaxSizeUpload =
UPLOAD_LIMIT_DEFAULT,
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index 1406fe87a3..fa8f92f09f 100644
---
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -26,7 +26,7 @@ import java.util.{Optional, stream}
import com.google.common.collect.ImmutableMap
import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
-import io.netty.handler.codec.http.websocketx.WebSocketFrame
+import io.netty.handler.codec.http.websocketx.{PingWebSocketFrame,
TextWebSocketFrame, WebSocketFrame}
import io.netty.handler.codec.http.{HttpHeaderNames, HttpMethod}
import jakarta.inject.{Inject, Named}
import org.apache.james.core.{ConnectionDescription,
ConnectionDescriptionSupplier, Disconnector, Username}
@@ -51,9 +51,10 @@ import
reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
import reactor.core.publisher.{Mono, Sinks}
import reactor.core.scala.publisher.{SFlux, SMono}
import reactor.core.scheduler.Schedulers
-import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
+import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse,
WebsocketServerSpec}
import reactor.netty.http.websocket.{WebsocketInbound, WebsocketOutbound}
+import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters._
object WebSocketRoutes {
@@ -75,6 +76,7 @@ case class ClientContext(outbound:
Sinks.Many[OutboundMessage], pushRegistration
}
class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val
authenticator: Authenticator,
+ val configuration: JmapRfc8621Configuration,
userProvisioner: UserProvisioning,
@Named(JMAPInjectionKeys.JMAP) eventBus:
EventBus,
jmapApi: JMAPApi,
@@ -87,6 +89,7 @@ class WebSocketRoutes @Inject()
(@Named(InjectionKeys.RFC_8621) val authenticato
private val openingConnectionsMetric: Metric =
metricFactory.generate("jmap_websocket_opening_connections_count")
private val requestCountMetric: Metric =
metricFactory.generate("jmap_websocket_requests_count")
private val connectedUsers:
java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext] = new
java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext]
+ private val websocketServerSpec: WebsocketServerSpec =
WebsocketServerSpec.builder.handlePing(false).build
override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
JMAPRoute.builder
@@ -103,7 +106,7 @@ class WebSocketRoutes @Inject()
(@Named(InjectionKeys.RFC_8621) val authenticato
.flatMap((mailboxSession: MailboxSession) =>
userProvisioner.provisionUser(mailboxSession)
.`then`
.`then`(SMono(httpServerResponse.addHeader(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL,
"jmap")
- .sendWebsocket((in, out) =>
handleWebSocketConnection(mailboxSession)(in, out)))))
+ .sendWebsocket((in: WebsocketInbound, out: WebsocketOutbound) =>
handleWebSocketConnection(mailboxSession)(in, out), websocketServerSpec))))
.onErrorResume(throwable => handleHttpHandshakeError(throwable,
httpServerResponse))
.asJava()
.`then`()
@@ -115,11 +118,8 @@ class WebSocketRoutes @Inject()
(@Named(InjectionKeys.RFC_8621) val authenticato
val context = ClientContext(sink, new AtomicReference[Registration](),
session)
val responseFlux: SFlux[OutboundMessage] =
SFlux[WebSocketFrame](in.aggregateFrames()
.receiveFrames())
- .map(frame => {
- val bytes = new Array[Byte](frame.content().readableBytes)
- frame.content().readBytes(bytes)
- new String(bytes, StandardCharsets.UTF_8)
- })
+ .filter(frame => frame.isInstanceOf[TextWebSocketFrame])
+ .map(frame => frame.asInstanceOf[TextWebSocketFrame].text())
.doOnNext(_ => connectedUsers.put(context, context))
.doOnNext(_ => requestCountMetric.increment())
.flatMap(message => handleClientMessages(context)(message))
@@ -134,13 +134,21 @@ class WebSocketRoutes @Inject()
(@Named(InjectionKeys.RFC_8621) val authenticato
openingConnectionsMetric.decrement()
})
- out.sendString(
- SFlux.merge(Seq(responseFlux, sink.asFlux()))
- .map(pushSerializer.serialize)
- .map(Json.stringify))
- .`then`()
+ val responseAndSinkFlux: SFlux[WebSocketFrame] =
SFlux.merge(Seq(responseFlux, sink.asFlux()))
+ .map(pushSerializer.serialize)
+ .map(json => new TextWebSocketFrame(Json.stringify(json)))
+
+ val resultFlux: SFlux[WebSocketFrame] = configuration.websocketPingInterval
+ .map(interval =>
responseAndSinkFlux.mergeWith(pingMessagePublisher(interval)))
+ .getOrElse(responseAndSinkFlux)
+
+ out.sendObject(resultFlux).`then`()
}
+ private def pingMessagePublisher(duration: Duration): SFlux[WebSocketFrame] =
+ SFlux.interval(duration)
+ .map(_ => new PingWebSocketFrame())
+
private def handleClientMessages(clientContext: ClientContext)(message:
String): SMono[OutboundMessage] =
pushSerializer.deserializeWebSocketInboundMessage(message)
.fold(invalid => {
diff --git a/src/site/xdoc/server/config-jmap.xml
b/src/site/xdoc/server/config-jmap.xml
index 23cea6c482..d92f30ac9d 100644
--- a/src/site/xdoc/server/config-jmap.xml
+++ b/src/site/xdoc/server/config-jmap.xml
@@ -66,6 +66,11 @@
<dd>Optional. URL for JMAP WebSocket route</dd>
<dd>Default value: ws://localhost</dd>
+ <dt><strong>websocket.ping.interval</strong></dt>
+ <dd>Optional. Configure the duration of the interval
between consecutive ping messages (as specified in RFC6455) sent by the server
to the client over a WebSocket connection.
+ The supported unit is seconds (e.g: `3s` for a
3-second interval)</dd>
+ <dd>Default is empty, this feature is disabled.</dd>
+
<dt><strong>upload.max.size</strong></dt>
<dd>Optional. Configuration max size for each upload file
in new JMAP-RFC-8621.</dd>
<dd>Default value: 30M. Supported units are B (bytes) K
(KB) M (MB) G (GB).</dd>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]