This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new d7a4f45fa4 JAMES-3539 PushSubscription/get should return expired subscriptions (#1845) d7a4f45fa4 is described below commit d7a4f45fa4d248a4911612eb367cef72e56c5f8c Author: Trần Hồng Quân <55171818+quantranhong1...@users.noreply.github.com> AuthorDate: Mon Dec 11 13:45:53 2023 +0700 JAMES-3539 PushSubscription/get should return expired subscriptions (#1845) --- .../CassandraPushSubscriptionRepository.java | 7 ++--- .../MemoryPushSubscriptionRepository.java | 7 ++--- .../PushSubscriptionRepositoryContract.scala | 34 ++++++---------------- .../james/jmap/pushsubscription/PushListener.scala | 6 +++- .../jmap/pushsubscription/PushListenerTest.scala | 2 +- 5 files changed, 19 insertions(+), 37 deletions(-) diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepository.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepository.java index 47dd50f047..ae6f5748d4 100644 --- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepository.java +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepository.java @@ -22,7 +22,6 @@ package org.apache.james.jmap.cassandra.pushsubscription; import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.evaluateExpiresTime; import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isInThePast; import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isInvalidPushSubscriptionKey; -import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isNotOutdatedSubscription; import java.time.Clock; import java.time.ZonedDateTime; @@ -130,14 +129,12 @@ public class CassandraPushSubscriptionRepository implements PushSubscriptionRepo @Override public Publisher<PushSubscription> get(Username username, Set<PushSubscriptionId> ids) { return dao.selectAll(username) - .filter(subscription -> ids.contains(subscription.id())) - .filter(subscription -> isNotOutdatedSubscription(subscription, clock)); + .filter(subscription -> ids.contains(subscription.id())); } @Override public Publisher<PushSubscription> list(Username username) { - return dao.selectAll(username) - .filter(subscription -> isNotOutdatedSubscription(subscription, clock)); + return dao.selectAll(username); } private Mono<PushSubscription> retrieveByPushSubscriptionId(Username username, PushSubscriptionId id) { diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java index c7cf16ac85..3033409414 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java @@ -22,7 +22,6 @@ package org.apache.james.jmap.memory.pushsubscription; import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.evaluateExpiresTime; import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isInThePast; import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isInvalidPushSubscriptionKey; -import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isNotOutdatedSubscription; import java.time.Clock; import java.time.ZonedDateTime; @@ -126,15 +125,13 @@ public class MemoryPushSubscriptionRepository implements PushSubscriptionReposit public Publisher<PushSubscription> get(Username username, Set<PushSubscriptionId> ids) { return Flux.fromStream(table.row(username).entrySet().stream()) .filter(entry -> ids.contains(entry.getKey())) - .map(Map.Entry::getValue) - .filter(subscription -> isNotOutdatedSubscription(subscription, clock)); + .map(Map.Entry::getValue); } @Override public Publisher<PushSubscription> list(Username username) { return Flux.fromStream(table.row(username).entrySet().stream()) - .map(Map.Entry::getValue) - .filter(subscription -> isNotOutdatedSubscription(subscription, clock)); + .map(Map.Entry::getValue); } @Override diff --git a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/pushsubscription/PushSubscriptionRepositoryContract.scala b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/pushsubscription/PushSubscriptionRepositoryContract.scala index 209563d0e9..c499dfe13d 100644 --- a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/pushsubscription/PushSubscriptionRepositoryContract.scala +++ b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/pushsubscription/PushSubscriptionRepositoryContract.scala @@ -316,27 +316,19 @@ trait PushSubscriptionRepositoryContract { } @Test - def getSubscriptionShouldNotReturnOutdatedSubscriptions(): Unit = { - val deviceClientId1 = DeviceClientId("1") - val deviceClientId2 = DeviceClientId("2") + def getSubscriptionShouldReturnExpiredSubscriptions(): Unit = { val validRequest1 = PushSubscriptionCreationRequest( - deviceClientId = deviceClientId1, + deviceClientId = DeviceClientId("1"), url = PushSubscriptionServerURL(new URL("https://example.com/push")), expires = Option(PushSubscriptionExpiredTime(VALID_EXPIRE.plusDays(1))), types = Seq(CustomTypeName1)) - val validRequest2 = PushSubscriptionCreationRequest( - deviceClientId = deviceClientId2, - url = PushSubscriptionServerURL(new URL("https://example.com/push")), - expires = Option(PushSubscriptionExpiredTime(VALID_EXPIRE.plusDays(3))), - types = Seq(CustomTypeName2)) - val pushSubscriptionId1 = SMono.fromPublisher(testee.save(ALICE, validRequest1)).block().id - val pushSubscriptionId2 = SMono.fromPublisher(testee.save(ALICE, validRequest2)).block().id + val pushSubscriptionId = SMono.fromPublisher(testee.save(ALICE, validRequest1)).block().id clock.setInstant(VALID_EXPIRE.plusDays(2).toInstant) - val pushSubscriptions = SFlux.fromPublisher(testee.get(ALICE, Set(pushSubscriptionId1, pushSubscriptionId2).asJava)).collectSeq().block() + val pushSubscriptions = SFlux.fromPublisher(testee.get(ALICE, Set(pushSubscriptionId).asJava)).collectSeq().block() - assertThat(pushSubscriptions.map(_.id).toList.asJava).containsExactlyInAnyOrder(pushSubscriptionId2) + assertThat(pushSubscriptions.map(_.id).toList.asJava).containsOnly(pushSubscriptionId) } @Test @@ -363,27 +355,19 @@ trait PushSubscriptionRepositoryContract { } @Test - def listSubscriptionShouldNotReturnOutdatedSubscriptions(): Unit = { - val deviceClientId1 = DeviceClientId("1") - val deviceClientId2 = DeviceClientId("2") + def listSubscriptionShouldReturnExpiredSubscriptions(): Unit = { val validRequest1 = PushSubscriptionCreationRequest( - deviceClientId = deviceClientId1, + deviceClientId = DeviceClientId("1"), url = PushSubscriptionServerURL(new URL("https://example.com/push")), expires = Option(PushSubscriptionExpiredTime(VALID_EXPIRE.plusDays(1))), types = Seq(CustomTypeName1)) - val validRequest2 = PushSubscriptionCreationRequest( - deviceClientId = deviceClientId2, - url = PushSubscriptionServerURL(new URL("https://example.com/push")), - expires = Option(PushSubscriptionExpiredTime(VALID_EXPIRE.plusDays(3))), - types = Seq(CustomTypeName2)) - SMono.fromPublisher(testee.save(ALICE, validRequest1)).block().id - val pushSubscriptionId2 = SMono.fromPublisher(testee.save(ALICE, validRequest2)).block().id + val pushSubscriptionId = SMono.fromPublisher(testee.save(ALICE, validRequest1)).block().id clock.setInstant(VALID_EXPIRE.plusDays(2).toInstant) val pushSubscriptions = SFlux.fromPublisher(testee.list(ALICE)).collectSeq().block() - assertThat(pushSubscriptions.map(_.id).toList.asJava).containsExactlyInAnyOrder(pushSubscriptionId2) + assertThat(pushSubscriptions.map(_.id).toList.asJava).containsOnly(pushSubscriptionId) } @Test diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala index c64a0913b6..3ce9df64ee 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala @@ -20,6 +20,7 @@ package org.apache.james.jmap.pushsubscription import java.nio.charset.StandardCharsets +import java.time.Clock import java.util.Base64 import com.google.common.annotations.VisibleForTesting @@ -28,6 +29,7 @@ import javax.inject.Inject import org.apache.james.events.EventListener.ReactiveGroupEventListener import org.apache.james.events.{Event, Group} import org.apache.james.jmap.api.model.PushSubscription +import org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isNotOutdatedSubscription import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository import org.apache.james.jmap.change.{EmailDeliveryTypeName, StateChangeEvent} import org.apache.james.jmap.core.StateChange @@ -61,7 +63,8 @@ object PushListener { class PushListener @Inject()(pushRepository: PushSubscriptionRepository, webPushClient: WebPushClient, pushSerializer: PushSerializer, - delegationStore: DelegationStore) extends ReactiveGroupEventListener { + delegationStore: DelegationStore, + clock: Clock) extends ReactiveGroupEventListener { override def getDefaultGroup: Group = PushListenerGroup() @@ -71,6 +74,7 @@ class PushListener @Inject()(pushRepository: PushSubscriptionRepository, SMono.just(event.username) .concatWith(delegationStore.authorizedUsers(event.username)) .flatMap(pushRepository.list) + .filter(isNotOutdatedSubscription(_, clock)) .filter(_.validated) .flatMap(sendNotification(_, event), ReactorUtils.DEFAULT_CONCURRENCY) .`then`() diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala index bae016a02c..53ef0a4c7a 100644 --- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala +++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala @@ -69,7 +69,7 @@ class PushListenerTest { pushSubscriptionRepository = new MemoryPushSubscriptionRepository(Clock.systemUTC()) webPushClient = mock(classOf[WebPushClient]) delegationStore = new MemoryDelegationStore() - testee = new PushListener(pushSubscriptionRepository, webPushClient, pushSerializer, delegationStore) + testee = new PushListener(pushSubscriptionRepository, webPushClient, pushSerializer, delegationStore, Clock.systemUTC()) when(webPushClient.push(any(), any())).thenReturn(SMono.empty[Unit]) } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org