This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2a9ef7bbc56622aee38c5a2814e8a3669efe5aa5 Author: Rene Cordier <[email protected]> AuthorDate: Tue Apr 7 11:31:37 2026 +0700 JAMES-4197 Add a DeserializationResult interface for deserialization results in EventSerializer --- .../apache/james/events/DeserializationResult.java | 84 ++++++++++++++++++++++ .../org/apache/james/events/EventSerializer.java | 9 ++- .../james/events/EventSerializersAggregator.java | 15 ++-- .../apache/james/events/EventBusTestFixture.java | 22 +++--- .../james/events/CassandraEventDeadLettersDAO.java | 2 +- .../org/apache/james/events/GroupRegistration.java | 4 +- .../james/events/GroupRegistrationHandler.java | 5 +- .../james/events/KeyRegistrationHandler.java | 11 +-- .../apache/james/events/RabbitMQEventBusTest.java | 2 +- .../james/events/PostgresEventDeadLetters.java | 2 +- .../james/event/json/MailboxEventSerializer.scala | 16 ++--- .../james/jmap/change/JmapEventSerializer.scala | 14 ++-- 12 files changed, 135 insertions(+), 51 deletions(-) diff --git a/event-bus/api/src/main/java/org/apache/james/events/DeserializationResult.java b/event-bus/api/src/main/java/org/apache/james/events/DeserializationResult.java new file mode 100644 index 0000000000..4fc4b02b6a --- /dev/null +++ b/event-bus/api/src/main/java/org/apache/james/events/DeserializationResult.java @@ -0,0 +1,84 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.events; + +import java.util.List; +import java.util.Optional; + +import com.google.common.collect.ImmutableList; + +public sealed interface DeserializationResult permits DeserializationResult.Success, DeserializationResult.SuccessList, DeserializationResult.Failure { + static DeserializationResult of(Optional<Event> maybeEvent, String throwingMessage) { + return maybeEvent.<DeserializationResult>map(Success::new) + .orElse(new Failure(throwingMessage)); + } + + static DeserializationResult ofList(Optional<List<Event>> maybeEvents, String throwingMessage) { + return maybeEvents.<DeserializationResult>map(SuccessList::new) + .orElse(new Failure(throwingMessage)); + } + + Event event(); + + List<Event> events(); + + default boolean isSuccess() { + return this instanceof Success || this instanceof SuccessList; + } + + record Success(Event event) implements DeserializationResult { + @Override + public Event event() { + return event; + } + + @Override + public List<Event> events() { + return ImmutableList.of(event); + } + } + + record SuccessList(List<Event> events) implements DeserializationResult { + @Override + public Event event() { + if (events.size() != 1) { + throw new IllegalStateException("Expected exactly one event but got " + events.size()); + } + return events.getFirst(); + } + + @Override + public List<Event> events() { + return events; + } + } + + record Failure(String throwingMessage) implements DeserializationResult { + @Override + public Event event() { + throw new RuntimeException(throwingMessage); + } + + @Override + public List<Event> events() { + throw new RuntimeException(throwingMessage); + } + } +} diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventSerializer.java b/event-bus/api/src/main/java/org/apache/james/events/EventSerializer.java index c3bf6b3869..59b5d54a6a 100644 --- a/event-bus/api/src/main/java/org/apache/james/events/EventSerializer.java +++ b/event-bus/api/src/main/java/org/apache/james/events/EventSerializer.java @@ -21,7 +21,6 @@ package org.apache.james.events; import java.nio.charset.StandardCharsets; import java.util.Collection; -import java.util.List; import java.util.Optional; public interface EventSerializer { @@ -37,15 +36,15 @@ public interface EventSerializer { return toJson(event).map(json -> json.getBytes(StandardCharsets.UTF_8)); } - Optional<Event> asEvent(String serialized); + DeserializationResult asEvent(String serialized); - Optional<List<Event>> asEvents(String serialized); + DeserializationResult asEvents(String serialized); - default Optional<Event> fromBytes(byte[] serialized) { + default DeserializationResult fromBytes(byte[] serialized) { return asEvent(new String(serialized, StandardCharsets.UTF_8)); } - default Optional<List<Event>> asEventsFromBytes(byte[] serialized) { + default DeserializationResult asEventsFromBytes(byte[] serialized) { return asEvents(new String(serialized, StandardCharsets.UTF_8)); } } diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java b/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java index 86b8baf3f0..f663567f16 100644 --- a/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java +++ b/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java @@ -20,7 +20,6 @@ package org.apache.james.events; import java.util.Collection; -import java.util.List; import java.util.Optional; import java.util.Set; @@ -45,11 +44,12 @@ public class EventSerializersAggregator implements EventSerializer { } @Override - public Optional<Event> asEvent(String serialized) { + public DeserializationResult asEvent(String serialized) { return allEventSerializers.stream() .map(eventSerializer -> eventSerializer.asEvent(serialized)) - .flatMap(Optional::stream) - .findFirst(); + .filter(DeserializationResult::isSuccess) + .findFirst() + .orElse(new DeserializationResult.Failure("Could not deserialize event: " + serialized)); } @Override @@ -61,10 +61,11 @@ public class EventSerializersAggregator implements EventSerializer { } @Override - public Optional<List<Event>> asEvents(String serialized) { + public DeserializationResult asEvents(String serialized) { return allEventSerializers.stream() .map(eventSerializer -> eventSerializer.asEvents(serialized)) - .flatMap(Optional::stream) - .findFirst(); + .filter(DeserializationResult::isSuccess) + .findFirst() + .orElse(new DeserializationResult.Failure("Could not deserialize events: " + serialized)); } } diff --git a/event-bus/api/src/test/java/org/apache/james/events/EventBusTestFixture.java b/event-bus/api/src/test/java/org/apache/james/events/EventBusTestFixture.java index b56985cfc9..af0d419193 100644 --- a/event-bus/api/src/test/java/org/apache/james/events/EventBusTestFixture.java +++ b/event-bus/api/src/test/java/org/apache/james/events/EventBusTestFixture.java @@ -186,18 +186,18 @@ public interface EventBusTestFixture { } @Override - public Optional<Event> asEvent(String serialized) { + public DeserializationResult asEvent(String serialized) { if (!serialized.contains("&") || serialized.contains(ARRAY_SEPARATOR)) { - return Optional.empty(); + return new DeserializationResult.Failure("Invalid format: " + serialized); } List<String> parts = Splitter.on("&").splitToList(serialized); if (!parts.get(0).equals(TestEvent.class.getCanonicalName())) { - return Optional.empty(); + return new DeserializationResult.Failure("Unknown event type: " + parts.get(0)); } Event.EventId eventId = Event.EventId.of(UUID.fromString(parts.get(1))); Username username = Username.of(Joiner.on("&").join(parts.stream().skip(2).collect(ImmutableList.toImmutableList()))); - return Optional.of(new TestEvent(eventId, username)); + return new DeserializationResult.Success(new TestEvent(eventId, username)); } @Override @@ -209,12 +209,14 @@ public interface EventBusTestFixture { } @Override - public Optional<List<Event>> asEvents(String serialized) { - return Optional.of(Splitter.on(ARRAY_SEPARATOR) - .splitToStream(serialized) - .map(this::asEvent) - .map(Optional::get) - .collect(ImmutableList.toImmutableList())); + public DeserializationResult asEvents(String serialized) { + return DeserializationResult.ofList( + Optional.of(Splitter.on(ARRAY_SEPARATOR) + .splitToStream(serialized) + .map(this::asEvent) + .map(DeserializationResult::event) + .collect(ImmutableList.toImmutableList())), + "Could not deserialize events"); } } diff --git a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java index d7a547c643..183a3a733e 100644 --- a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java +++ b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java @@ -143,6 +143,6 @@ public class CassandraEventDeadLettersDAO { private Event deserializeEvent(String serializedEvent) { return eventSerializer.asEvent(serializedEvent) - .orElseThrow(() -> new RuntimeException("Could not deserialize event: " + serializedEvent)); + .event(); } } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java index a3be6d6eeb..cecd93c61d 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java @@ -27,7 +27,6 @@ import static org.apache.james.backends.rabbitmq.Constants.evaluateAutoDelete; import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable; import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -178,8 +177,7 @@ class GroupRegistration implements Registration { } private Mono<Event> deserializeEvent(byte[] eventAsBytes) { - return Mono.fromCallable(() -> eventSerializer.fromBytes(eventAsBytes) - .orElseThrow(() -> new RuntimeException("Could not deserialize event: " + new String(eventAsBytes, StandardCharsets.UTF_8)))) + return Mono.fromCallable(() -> eventSerializer.fromBytes(eventAsBytes).event()) .subscribeOn(Schedulers.parallel()); } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java index 69e1add06a..46251ae7ff 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java @@ -152,9 +152,8 @@ public class GroupRegistrationHandler { } private Mono<List<Event>> deserializeEvents(byte[] eventAsBytes) { - return Mono.fromCallable(() -> - eventSerializer.asEventsFromBytes(eventAsBytes) - .orElseThrow(() -> new RuntimeException("Could not deserialize events"))); + return Mono.fromCallable(() -> eventSerializer.asEventsFromBytes(eventAsBytes) + .events()); } void stop() { diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java index 3a9c371678..aef8e8a88c 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java @@ -26,7 +26,6 @@ import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable; import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive; import static org.apache.james.events.RabbitMQEventBus.EVENT_BUS_ID; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; import java.util.Optional; @@ -222,9 +221,11 @@ class KeyRegistrationHandler { return deserializeEvents(bodyAsBytes); } - return eventSerializer.fromBytes(bodyAsBytes) - .map(List::of) - .orElse(deserializeEvents(bodyAsBytes)); + DeserializationResult result = eventSerializer.fromBytes(bodyAsBytes); + if (result.isSuccess()) { + return List.of(result.event()); + } + return deserializeEvents(bodyAsBytes); } private StructuredLogger structuredLogger(List<Event> events, RegistrationKey key) { @@ -243,6 +244,6 @@ class KeyRegistrationHandler { private List<Event> deserializeEvents(byte[] bodyAsBytes) { return eventSerializer.asEventsFromBytes(bodyAsBytes) - .orElseThrow(() -> new RuntimeException("Could not deserialize events: " + new String(bodyAsBytes, StandardCharsets.UTF_8))); + .events(); } } diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java index 38eb279be2..b1e531cd8a 100644 --- a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java +++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java @@ -431,7 +431,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, .blockFirst() .getBody(); - return eventSerializer.asEvent(new String(eventInBytes, StandardCharsets.UTF_8)).get(); + return eventSerializer.asEvent(new String(eventInBytes, StandardCharsets.UTF_8)).event(); } } } diff --git a/event-bus/postgres/src/main/java/org/apache/james/events/PostgresEventDeadLetters.java b/event-bus/postgres/src/main/java/org/apache/james/events/PostgresEventDeadLetters.java index 4c4bb07357..871b1f34e9 100644 --- a/event-bus/postgres/src/main/java/org/apache/james/events/PostgresEventDeadLetters.java +++ b/event-bus/postgres/src/main/java/org/apache/james/events/PostgresEventDeadLetters.java @@ -92,7 +92,7 @@ public class PostgresEventDeadLetters implements EventDeadLetters { private Event deserializeEvent(Record record) { String serializedEvent = record.get(EVENT); return eventSerializer.asEvent(serializedEvent) - .orElseThrow(() -> new RuntimeException("Could not deserialize event: " + serializedEvent)); + .event(); } @Override diff --git a/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala b/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala index b0ddc58daa..6a21ed08a9 100644 --- a/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala +++ b/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala @@ -30,7 +30,7 @@ import org.apache.james.core.quota.{QuotaCountLimit, QuotaCountUsage, QuotaSizeL import org.apache.james.event.json.DTOs.SystemFlag.SystemFlag import org.apache.james.event.json.DTOs._ import org.apache.james.events.Event.EventId -import org.apache.james.events.{EventSerializer, Event => JavaEvent} +import org.apache.james.events.{DeserializationResult, EventSerializer, Event => JavaEvent} import org.apache.james.mailbox.MailboxSession.SessionId import org.apache.james.mailbox.events.MailboxEvents.Added.IS_APPENDED import org.apache.james.mailbox.events.MailboxEvents.{Added => JavaAdded, Expunged => JavaExpunged, FlagsUpdated => JavaFlagsUpdated, MailboxACLUpdated => JavaMailboxACLUpdated, MailboxAdded => JavaMailboxAdded, MailboxDeletion => JavaMailboxDeletion, MailboxRenamed => JavaMailboxRenamed, MailboxSubscribedEvent => JavaMailboxSubscribedEvent, MailboxUnsubscribedEvent => JavaMailboxUnsubscribedEvent, MessageContentDeletionEvent => JavaMessageContentDeletionEvent, QuotaUsageUpdatedEvent => [...] @@ -466,21 +466,19 @@ class MailboxEventSerializer @Inject()(mailboxIdFactory: MailboxId.Factory, mess override def toJson(event: util.Collection[JavaEvent]): Optional[String] = Optional.of(jsonSerialize.toJson(event)) - override def asEvents(serialized: String): Optional[util.List[JavaEvent]] = { + override def asEvents(serialized: String): DeserializationResult = { val result = jsonSerialize.fromJsonAsEvents(serialized) if (result.isError) { - Optional.empty() - } else { - Optional.of(result.get.asJava) + List(new DeserializationResult.Failure(s"Could not deserialize events: $serialized")).asJava } + new DeserializationResult.SuccessList(result.get.asJava) } - override def asEvent(serialized: String): Optional[JavaEvent] = { + override def asEvent(serialized: String): DeserializationResult = { val result = fromJson(serialized) if (result.isError) { - Optional.empty() - } else { - Optional.of(result.get) + new DeserializationResult.Failure(s"Could not deserialize event: $serialized") } + new DeserializationResult.Success(result.get) } } diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala index 08118e10f8..65eb6c33e9 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala @@ -23,11 +23,10 @@ import java.util import java.util.Optional import com.fasterxml.jackson.annotation.JsonProperty -import com.google.common.collect.ImmutableList import jakarta.inject.Inject import org.apache.james.core.Username import org.apache.james.events.Event.EventId -import org.apache.james.events.{Event, EventSerializer} +import org.apache.james.events.{DeserializationResult, Event, EventSerializer} import org.apache.james.jmap.api.change.TypeStateFactory import org.apache.james.jmap.api.model.{State, TypeName} import org.apache.james.jmap.core.UuidState @@ -87,11 +86,15 @@ case class JmapEventSerializer @Inject()(stateChangeEventDTOFactory: StateChange override def toJson(event: Event): Optional[String] = genericSerializer.maybeSerialize(event) - override def asEvent(serialized: String): Optional[Event] = genericSerializer.maybeDeserialize(serialized) + override def asEvent(serialized: String): DeserializationResult = DeserializationResult.of( + genericSerializer.maybeDeserialize(serialized), + "Could not deserialize event " + serialized) override def toJsonBytes(event: Event): Optional[Array[Byte]] = genericSerializer.maybeSerializeToBytes(event) - override def fromBytes(serialized: Array[Byte]): Optional[Event] = genericSerializer.maybeDeserializeFromBytes(serialized) + override def fromBytes(serialized: Array[Byte]): DeserializationResult = DeserializationResult.of( + genericSerializer.maybeDeserializeFromBytes(serialized), + "Could not deserialize event " + serialized.map(_.toChar).mkString) override def toJson(event: util.Collection[Event]): Optional[String] = { if (event.size() != 1) { @@ -100,6 +103,5 @@ case class JmapEventSerializer @Inject()(stateChangeEventDTOFactory: StateChange toJson(event.iterator().next()) } - override def asEvents(serialized: String): Optional[util.List[Event]] = - asEvent(serialized).map(event => ImmutableList.of(event)) + override def asEvents(serialized: String): DeserializationResult = asEvent(serialized) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
