This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit dfb6defc8aef5f51da9902552bb5a5cf1113bac5 Author: Benoit Tellier <[email protected]> AuthorDate: Sat May 15 09:43:55 2021 +0700 [FIX] JMAP draft metrics ended up not being published --- .../james/jmap/draft/methods/GetFilterMethod.java | 4 +- .../jmap/draft/methods/GetMailboxesMethod.java | 6 +-- .../jmap/draft/methods/GetMessageListMethod.java | 6 +-- .../jmap/draft/methods/GetMessagesMethod.java | 6 +-- .../apache/james/jmap/draft/methods/Method.java | 8 +--- .../jmap/draft/methods/SetMailboxesMethod.java | 42 ++++++++++--------- .../jmap/draft/methods/SetMailboxesProcessor.java | 8 ++++ .../jmap/draft/methods/SetMessagesMethod.java | 47 +++++++++++++--------- .../jmap/draft/methods/SetMessagesProcessor.java | 8 ++++ .../draft/methods/SetVacationResponseMethod.java | 6 +-- .../jmap/draft/methods/RequestHandlerTest.java | 11 ++--- 11 files changed, 87 insertions(+), 65 deletions(-) diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java index 1f7eccb..45652ac 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java @@ -74,8 +74,8 @@ public class GetFilterMethod implements Method { GetFilterRequest filterRequest = (GetFilterRequest) request; - return Flux.from(metricFactory.decorateSupplierWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), - () -> process(methodCallId, mailboxSession, filterRequest) + return Flux.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), + process(methodCallId, mailboxSession, filterRequest) .subscriberContext(context("GET_FILTER", MDCBuilder.of(MDCBuilder.ACTION, "GET_FILTER"))))); } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java index 8381fdc..f0fe1fd 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java @@ -97,9 +97,9 @@ public class GetMailboxesMethod implements Method { Preconditions.checkArgument(request instanceof GetMailboxesRequest); GetMailboxesRequest mailboxesRequest = (GetMailboxesRequest) request; - return metricFactory.decorateSupplierWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), - () -> process(methodCallId, mailboxSession, mailboxesRequest) - .subscriberContext(context(ACTION, mdc(mailboxesRequest)))); + return Flux.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), + process(methodCallId, mailboxSession, mailboxesRequest) + .subscriberContext(context(ACTION, mdc(mailboxesRequest))))); } private MDCBuilder mdc(GetMailboxesRequest mailboxesRequest) { diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java index caad396..df32c34 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java @@ -113,9 +113,9 @@ public class GetMessageListMethod implements Method { GetMessageListRequest messageListRequest = (GetMessageListRequest) request; - return metricFactory.decorateSupplierWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), - () -> process(methodCallId, mailboxSession, messageListRequest) - .subscriberContext(context("GET_MESSAGE_LIST", mdc(messageListRequest)))); + return Flux.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), + process(methodCallId, mailboxSession, messageListRequest) + .subscriberContext(context("GET_MESSAGE_LIST", mdc(messageListRequest))))); } private MDCBuilder mdc(GetMessageListRequest messageListRequest) { diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessagesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessagesMethod.java index 1532525..5038d6b 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessagesMethod.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessagesMethod.java @@ -83,15 +83,15 @@ public class GetMessagesMethod implements Method { GetMessagesRequest getMessagesRequest = (GetMessagesRequest) request; MessageProperties outputProperties = getMessagesRequest.getProperties().toOutputProperties(); - return metricFactory.decorateSupplierWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), - () -> Flux.from(getMessagesResponse(mailboxSession, getMessagesRequest) + return Flux.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), + Flux.from(getMessagesResponse(mailboxSession, getMessagesRequest) .map(response -> JmapResponse.builder().methodCallId(methodCallId) .response(response) .responseName(RESPONSE_NAME) .properties(outputProperties.getOptionalMessageProperties()) .filterProvider(buildOptionalHeadersFilteringFilterProvider(outputProperties)) .build())) - .subscriberContext(context("GET_MESSAGES", mdc(getMessagesRequest)))); + .subscriberContext(context("GET_MESSAGES", mdc(getMessagesRequest))))); } private MDCBuilder mdc(GetMessagesRequest getMessagesRequest) { diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/Method.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/Method.java index 564f6de..8ff888c 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/Method.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/Method.java @@ -31,8 +31,6 @@ import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Preconditions; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; public interface Method { @@ -128,11 +126,7 @@ public interface Method { Class<? extends JmapRequest> requestType(); - default Flux<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { - return Mono.fromCallable(() -> processToStream(request, methodCallId, mailboxSession)) - .flatMapMany(Flux::fromStream) - .subscribeOn(Schedulers.elastic()); - } + Flux<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession); default Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { return process(request, methodCallId, mailboxSession) diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMailboxesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMailboxesMethod.java index 9a0ffbf..e336dbc 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMailboxesMethod.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMailboxesMethod.java @@ -19,8 +19,10 @@ package org.apache.james.jmap.draft.methods; +import static org.apache.james.util.MDCBuilder.ACTION; +import static org.apache.james.util.ReactorUtils.context; + import java.util.Set; -import java.util.stream.Stream; import javax.inject.Inject; @@ -34,6 +36,9 @@ import org.apache.james.util.MDCBuilder; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class SetMailboxesMethod implements Method { private static final Request.Name METHOD_NAME = Request.name("setMailboxes"); @@ -59,7 +64,7 @@ public class SetMailboxesMethod implements Method { } @Override - public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { + public Flux<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { Preconditions.checkNotNull(request); Preconditions.checkNotNull(methodCallId); Preconditions.checkNotNull(mailboxSession); @@ -67,29 +72,28 @@ public class SetMailboxesMethod implements Method { SetMailboxesRequest setMailboxesRequest = (SetMailboxesRequest) request; + return Flux.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), + setMailboxesResponse(setMailboxesRequest, mailboxSession) + .map(response -> JmapResponse.builder().methodCallId(methodCallId) + .response(response) + .responseName(RESPONSE_NAME) + .build()))) + .subscriberContext(context(ACTION, mdc(setMailboxesRequest))); + } + private MDCBuilder mdc(SetMailboxesRequest setMailboxesRequest) { return MDCBuilder.create() .addContext(MDCBuilder.ACTION, "SET_MAILBOXES") .addContext("create", setMailboxesRequest.getCreate()) .addContext("update", setMailboxesRequest.getUpdate()) - .addContext("destroy", setMailboxesRequest.getDestroy()) - .wrapArround( - () -> metricFactory.decorateSupplierWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), - () -> Stream.of( - JmapResponse.builder().methodCallId(methodCallId) - .response(setMailboxesResponse(setMailboxesRequest, mailboxSession)) - .responseName(RESPONSE_NAME) - .build()))) - .get(); + .addContext("destroy", setMailboxesRequest.getDestroy()); } - private SetMailboxesResponse setMailboxesResponse(SetMailboxesRequest request, MailboxSession mailboxSession) { - return processors.stream() - .map(processor -> processor.process(request, mailboxSession)) - .reduce(SetMailboxesResponse.builder(), - (builder, resp) -> resp.mergeInto(builder), - (builder1, builder2) -> builder2.build().mergeInto(builder1) - ) - .build(); + private Mono<SetMailboxesResponse> setMailboxesResponse(SetMailboxesRequest request, MailboxSession mailboxSession) { + return Flux.fromIterable(processors) + .flatMap(processor -> processor.processReactive(request, mailboxSession)) + .reduce(SetMailboxesResponse.builder(), + (builder, resp) -> resp.mergeInto(builder)) + .map(SetMailboxesResponse.Builder::build); } } \ No newline at end of file diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMailboxesProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMailboxesProcessor.java index 13c8b73..d41ba6b 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMailboxesProcessor.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMailboxesProcessor.java @@ -23,8 +23,16 @@ import org.apache.james.jmap.draft.model.SetMailboxesRequest; import org.apache.james.jmap.draft.model.SetMailboxesResponse; import org.apache.james.mailbox.MailboxSession; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + public interface SetMailboxesProcessor { SetMailboxesResponse process(SetMailboxesRequest request, MailboxSession mailboxSession); + + default Mono<SetMailboxesResponse> processReactive(SetMailboxesRequest request, MailboxSession mailboxSession) { + return Mono.fromCallable(() -> process(request, mailboxSession)) + .subscribeOn(Schedulers.elastic()); + } } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesMethod.java index 05ecc44..e6e5260 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesMethod.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesMethod.java @@ -19,8 +19,10 @@ package org.apache.james.jmap.draft.methods; +import static org.apache.james.util.MDCBuilder.ACTION; +import static org.apache.james.util.ReactorUtils.context; + import java.util.Set; -import java.util.stream.Stream; import javax.inject.Inject; @@ -34,6 +36,9 @@ import org.apache.james.util.MDCBuilder; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class SetMessagesMethod implements Method { private static final Method.Request.Name METHOD_NAME = Method.Request.name("setMessages"); @@ -59,34 +64,36 @@ public class SetMessagesMethod implements Method { } @Override - public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { + public Flux<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { Preconditions.checkArgument(request instanceof SetMessagesRequest); SetMessagesRequest setMessagesRequest = (SetMessagesRequest) request; + return Flux.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), + setMessagesResponse(setMessagesRequest, mailboxSession) + .map(responses -> + JmapResponse.builder().methodCallId(methodCallId) + .response(responses) + .responseName(RESPONSE_NAME) + .build()))) + .subscriberContext(context(ACTION, mdc(setMessagesRequest))); + } + + + private MDCBuilder mdc(SetMessagesRequest setMessagesRequest) { return MDCBuilder.create() - .addContext(MDCBuilder.ACTION, "SET_MESSAGES") + .addContext(ACTION, "SET_MESSAGES") .addContext("accountId", setMessagesRequest.getAccountId()) .addContext("create", setMessagesRequest.getCreate()) .addContext("destroy", setMessagesRequest.getDestroy()) - .addContext("ifInState", setMessagesRequest.getIfInState()) - .wrapArround( - () -> metricFactory.decorateSupplierWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), - () -> Stream.of( - JmapResponse.builder().methodCallId(methodCallId) - .response(setMessagesResponse(setMessagesRequest, mailboxSession)) - .responseName(RESPONSE_NAME) - .build()))) - .get(); + .addContext("ifInState", setMessagesRequest.getIfInState()); } - private SetMessagesResponse setMessagesResponse(SetMessagesRequest request, MailboxSession mailboxSession) { - return messagesProcessors.stream() - .map(processor -> processor.process(request, mailboxSession)) - .reduce(SetMessagesResponse.builder(), - (builder, resp) -> resp.mergeInto(builder), - (builder1, builder2) -> builder2.build().mergeInto(builder1) - ) - .build(); + private Mono<SetMessagesResponse> setMessagesResponse(SetMessagesRequest request, MailboxSession mailboxSession) { + return Flux.fromIterable(messagesProcessors) + .flatMap(processor -> processor.processReactive(request, mailboxSession)) + .reduce(SetMessagesResponse.builder(), + (builder, resp) -> resp.mergeInto(builder)) + .map(SetMessagesResponse.Builder::build); } } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java index f7f183f..787d9f8 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java @@ -23,6 +23,14 @@ import org.apache.james.jmap.draft.model.SetMessagesRequest; import org.apache.james.jmap.draft.model.SetMessagesResponse; import org.apache.james.mailbox.MailboxSession; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + public interface SetMessagesProcessor { SetMessagesResponse process(SetMessagesRequest request, MailboxSession mailboxSession); + + default Mono<SetMessagesResponse> processReactive(SetMessagesRequest request, MailboxSession mailboxSession) { + return Mono.fromCallable(() -> process(request, mailboxSession)) + .subscribeOn(Schedulers.elastic()); + } } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetVacationResponseMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetVacationResponseMethod.java index 3b41a6c..5ba73ac 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetVacationResponseMethod.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetVacationResponseMethod.java @@ -80,10 +80,10 @@ public class SetVacationResponseMethod implements Method { Preconditions.checkArgument(request instanceof SetVacationRequest); SetVacationRequest setVacationRequest = (SetVacationRequest) request; - return metricFactory.decorateSupplierWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), - () -> process(methodCallId, mailboxSession, setVacationRequest) + return Flux.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), + process(methodCallId, mailboxSession, setVacationRequest) .subscriberContext(jmapAction("SET_VACATION")) - .subscriberContext(context("set-vacation", MDCBuilder.of("update", setVacationRequest.getUpdate())))); + .subscriberContext(context("set-vacation", MDCBuilder.of("update", setVacationRequest.getUpdate()))))); } private Flux<JmapResponse> process(MethodCallId methodCallId, MailboxSession mailboxSession, SetVacationRequest setVacationRequest) { diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/RequestHandlerTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/RequestHandlerTest.java index 12c9713..0cd5e5c 100644 --- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/RequestHandlerTest.java +++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/RequestHandlerTest.java @@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.tuple; import java.util.List; -import java.util.stream.Stream; import javax.inject.Inject; @@ -47,6 +46,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Flux; + public class RequestHandlerTest { public static class TestJmapRequest implements JmapRequest { @@ -105,10 +106,10 @@ public class RequestHandlerTest { } @Override - public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { + public Flux<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { Preconditions.checkArgument(request instanceof TestJmapRequest); TestJmapRequest typedRequest = (TestJmapRequest) request; - return Stream.of( + return Flux.just( JmapResponse.builder() .response(new TestJmapResponse(typedRequest.getId(), typedRequest.getName(), "works")) .responseName(Response.name("test")) @@ -192,8 +193,8 @@ public class RequestHandlerTest { } @Override - public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { - return null; + public Flux<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { + return Flux.empty(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
