Author: btellier Date: Sat Nov 28 13:06:33 2015 New Revision: 1716963 URL: http://svn.apache.org/viewvc?rev=1716963&view=rev Log: Ading a Broadcast based event system
Added: james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListener.java james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerTest.java Added: james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListener.java URL: http://svn.apache.org/viewvc/james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListener.java?rev=1716963&view=auto ============================================================================== --- james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListener.java (added) +++ james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListener.java Sat Nov 28 13:06:33 2015 @@ -0,0 +1,136 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.store.event.distributed; + +import org.apache.james.mailbox.MailboxListener; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.MailboxPath; +import org.apache.james.mailbox.store.event.EventSerializer; +import org.apache.james.mailbox.store.event.MailboxListenerRegistry; +import org.apache.james.mailbox.store.publisher.MessageConsumer; +import org.apache.james.mailbox.store.publisher.Publisher; +import org.apache.james.mailbox.store.publisher.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; + +public class BroadcastDelegatingMailboxListener implements DistributedDelegatingMailboxListener { + + private final static Logger LOGGER = LoggerFactory.getLogger(BroadcastDelegatingMailboxListener.class); + + private final MailboxListenerRegistry mailboxListenerRegistry; + private final Publisher publisher; + private final EventSerializer eventSerializer; + private final Topic globalTopic; + + public BroadcastDelegatingMailboxListener(Publisher publisher, + MessageConsumer messageConsumer, + EventSerializer eventSerializer, + String globalTopic) throws Exception { + this.mailboxListenerRegistry = new MailboxListenerRegistry(); + this.publisher = publisher; + this.eventSerializer = eventSerializer; + this.globalTopic = new Topic(globalTopic); + messageConsumer.setMessageReceiver(this); + messageConsumer.init(this.globalTopic); + } + + @Override + public ListenerType getType() { + return ListenerType.ONCE; + } + + @Override + public void addListener(MailboxPath mailboxPath, MailboxListener listener, MailboxSession session) throws MailboxException { + mailboxListenerRegistry.addListener(mailboxPath, listener); + } + + @Override + public void removeListener(MailboxPath mailboxPath, MailboxListener listener, MailboxSession session) throws MailboxException { + mailboxListenerRegistry.removeListener(mailboxPath, listener); + } + + @Override + public void addGlobalListener(MailboxListener listener, MailboxSession session) throws MailboxException { + mailboxListenerRegistry.addGlobalListener(listener); + } + + @Override + public void removeGlobalListener(MailboxListener listener, MailboxSession session) throws MailboxException { + mailboxListenerRegistry.removeGlobalListener(listener); + } + + @Override + public void event(Event event) { + deliverEventToGlobalListeners(event, ListenerType.ONCE); + try { + publisher.publish(globalTopic, eventSerializer.serializeEvent(event)); + } catch (Throwable t) { + event.getSession().getLog().error("Error while sending event to publisher", t); + } + } + + public void receiveSerializedEvent(byte[] serializedEvent) { + try { + Event event = eventSerializer.deSerializeEvent(serializedEvent); + deliverToMailboxPathRegisteredListeners(event); + deliverEventToGlobalListeners(event, ListenerType.EACH_NODE); + } catch (Exception e) { + LOGGER.error("Error while receiving serialized event", e); + } + } + + private void deliverToMailboxPathRegisteredListeners(Event event) { + Collection<MailboxListener> listenerSnapshot = mailboxListenerRegistry.getLocalMailboxListeners(event.getMailboxPath()); + if (event instanceof MailboxDeletion) { + mailboxListenerRegistry.deleteRegistryFor(event.getMailboxPath()); + } else if (event instanceof MailboxRenamed) { + MailboxRenamed renamed = (MailboxRenamed) event; + mailboxListenerRegistry.handleRename(renamed.getMailboxPath(), renamed.getNewPath()); + } + for (MailboxListener listener : listenerSnapshot) { + deliverEvent(event, listener); + } + } + + private void deliverEventToGlobalListeners(Event event, ListenerType type) { + for (MailboxListener mailboxListener : mailboxListenerRegistry.getGlobalListeners()) { + if (mailboxListener.getType() == type) { + deliverEvent(event, mailboxListener); + } + } + } + + private void deliverEvent(Event event, MailboxListener listener) { + try { + listener.event(event); + } catch(Throwable throwable) { + event.getSession() + .getLog() + .error("Error while processing listener " + + listener.getClass().getCanonicalName() + + " for " + + event.getClass().getCanonicalName(), + throwable); + } + } +} Added: james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerTest.java URL: http://svn.apache.org/viewvc/james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerTest.java?rev=1716963&view=auto ============================================================================== --- james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerTest.java (added) +++ james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerTest.java Sat Nov 28 13:06:33 2015 @@ -0,0 +1,228 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.store.event.distributed; + +import static org.assertj.core.api.Assertions.assertThat; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.apache.james.mailbox.MailboxListener; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.mock.MockMailboxSession; +import org.apache.james.mailbox.model.MailboxPath; +import org.apache.james.mailbox.store.event.EventSerializer; +import org.apache.james.mailbox.store.publisher.MessageConsumer; +import org.apache.james.mailbox.store.publisher.Publisher; +import org.apache.james.mailbox.store.publisher.Topic; +import org.apache.james.mailbox.util.EventCollector; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class BroadcastDelegatingMailboxListenerTest { + + private static final MailboxPath MAILBOX_PATH = new MailboxPath("namespace", "user", "name"); + private static final MailboxPath MAILBOX_PATH_NEW = new MailboxPath("namespace_new", "user_new", "name_new"); + private static final Topic TOPIC = new Topic("topic"); + private static final byte[] BYTES = new byte[0]; + private static final MailboxSession mailboxSession = new MockMailboxSession("benwa"); + + public static final MailboxListener.Event EVENT = new MailboxListener.Event(mailboxSession, MAILBOX_PATH) {}; + + private BroadcastDelegatingMailboxListener broadcastDelegatingMailboxListener; + private Publisher mockedPublisher; + private EventSerializer mockedEventSerializer; + private EventCollector mailboxEventCollector; + private EventCollector eachEventCollector; + private EventCollector onceEventCollector; + + @Before + public void setUp() throws Exception { + mockedEventSerializer = mock(EventSerializer.class); + mockedPublisher = mock(Publisher.class); + MessageConsumer messageConsumer = mock(MessageConsumer.class); + broadcastDelegatingMailboxListener = new BroadcastDelegatingMailboxListener(mockedPublisher, messageConsumer, mockedEventSerializer, TOPIC.getValue()); + mailboxEventCollector = new EventCollector(MailboxListener.ListenerType.MAILBOX); + eachEventCollector = new EventCollector(MailboxListener.ListenerType.EACH_NODE); + onceEventCollector = new EventCollector(MailboxListener.ListenerType.ONCE); + } + + @Test + public void eventWithNoRegisteredListenersShouldWork() throws Exception { + when(mockedEventSerializer.serializeEvent(EVENT)).thenAnswer(new Answer<byte[]>() { + @Override + public byte[] answer(InvocationOnMock invocation) throws Throwable { + return BYTES; + } + }); + broadcastDelegatingMailboxListener.event(EVENT); + verify(mockedEventSerializer).serializeEvent(EVENT); + verify(mockedPublisher).publish(TOPIC, BYTES); + verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); + } + + @Test + public void eventWithMailboxRegisteredListenerShouldWork() throws Exception { + broadcastDelegatingMailboxListener.addListener(MAILBOX_PATH, mailboxEventCollector, mailboxSession); + when(mockedEventSerializer.serializeEvent(EVENT)).thenAnswer(new Answer<byte[]>() { + @Override + public byte[] answer(InvocationOnMock invocation) throws Throwable { + return BYTES; + } + }); + broadcastDelegatingMailboxListener.event(EVENT); + assertThat(mailboxEventCollector.getEvents()).isEmpty(); + verify(mockedEventSerializer).serializeEvent(EVENT); + verify(mockedPublisher).publish(TOPIC, BYTES); + verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); + } + + @Test + public void eventWithEachRegisteredListenerShouldWork() throws Exception { + broadcastDelegatingMailboxListener.addGlobalListener(eachEventCollector, mailboxSession); + when(mockedEventSerializer.serializeEvent(EVENT)).thenAnswer(new Answer<byte[]>() { + @Override + public byte[] answer(InvocationOnMock invocation) throws Throwable { + return BYTES; + } + }); + broadcastDelegatingMailboxListener.event(EVENT); + assertThat(eachEventCollector.getEvents()).isEmpty(); + verify(mockedEventSerializer).serializeEvent(EVENT); + verify(mockedPublisher).publish(TOPIC, BYTES); + verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); + } + + @Test + public void eventWithOnceRegisteredListenerShouldWork() throws Exception { + broadcastDelegatingMailboxListener.addGlobalListener(onceEventCollector, mailboxSession); + when(mockedEventSerializer.serializeEvent(EVENT)).thenAnswer(new Answer<byte[]>() { + @Override + public byte[] answer(InvocationOnMock invocation) throws Throwable { + return BYTES; + } + }); + broadcastDelegatingMailboxListener.event(EVENT); + assertThat(onceEventCollector.getEvents()).containsOnly(EVENT); + verify(mockedEventSerializer).serializeEvent(EVENT); + verify(mockedPublisher).publish(TOPIC, BYTES); + verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); + } + + @Test + public void receiveSerializedEventShouldWorkWithNoRegisteredListeners() throws Exception { + when(mockedEventSerializer.deSerializeEvent(BYTES)).thenAnswer(new Answer<MailboxListener.Event>() { + @Override + public MailboxListener.Event answer(InvocationOnMock invocation) throws Throwable { + return EVENT; + } + }); + broadcastDelegatingMailboxListener.receiveSerializedEvent(BYTES); + verify(mockedEventSerializer).deSerializeEvent(BYTES); + verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); + } + + @Test + public void receiveSerializedEventShouldWorkWithMailboxRegisteredListeners() throws Exception { + broadcastDelegatingMailboxListener.addListener(MAILBOX_PATH, mailboxEventCollector, mailboxSession); + when(mockedEventSerializer.deSerializeEvent(BYTES)).thenAnswer(new Answer<MailboxListener.Event>() { + @Override + public MailboxListener.Event answer(InvocationOnMock invocation) throws Throwable { + return EVENT; + } + }); + broadcastDelegatingMailboxListener.receiveSerializedEvent(BYTES); + verify(mockedEventSerializer).deSerializeEvent(BYTES); + verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); + assertThat(mailboxEventCollector.getEvents()).containsOnly(EVENT); + } + + @Test + public void receiveSerializedEventShouldWorkWithEachRegisteredListeners() throws Exception { + broadcastDelegatingMailboxListener.addGlobalListener(eachEventCollector, mailboxSession); + when(mockedEventSerializer.deSerializeEvent(BYTES)).thenAnswer(new Answer<MailboxListener.Event>() { + @Override + public MailboxListener.Event answer(InvocationOnMock invocation) throws Throwable { + return EVENT; + } + }); + broadcastDelegatingMailboxListener.receiveSerializedEvent(BYTES); + verify(mockedEventSerializer).deSerializeEvent(BYTES); + verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); + assertThat(eachEventCollector.getEvents()).containsOnly(EVENT); + } + + @Test + public void receiveSerializedEventShouldWorkWithOnceRegisteredListeners() throws Exception { + broadcastDelegatingMailboxListener.addGlobalListener(onceEventCollector, mailboxSession); + when(mockedEventSerializer.deSerializeEvent(BYTES)).thenAnswer(new Answer<MailboxListener.Event>() { + @Override + public MailboxListener.Event answer(InvocationOnMock invocation) throws Throwable { + return EVENT; + } + }); + broadcastDelegatingMailboxListener.receiveSerializedEvent(BYTES); + verify(mockedEventSerializer).deSerializeEvent(BYTES); + verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); + assertThat(onceEventCollector.getEvents()).isEmpty(); + } + + @Test + public void deletionDistantEventsShouldBeWellHandled() throws Exception { + final MailboxListener.Event event = new MailboxListener.MailboxDeletion(mailboxSession, MAILBOX_PATH); + broadcastDelegatingMailboxListener.addListener(MAILBOX_PATH, mailboxEventCollector, mailboxSession); + when(mockedEventSerializer.deSerializeEvent(BYTES)).thenAnswer(new Answer<MailboxListener.Event>() { + @Override + public MailboxListener.Event answer(InvocationOnMock invocation) throws Throwable { + return event; + } + }); + broadcastDelegatingMailboxListener.receiveSerializedEvent(BYTES); + verify(mockedEventSerializer).deSerializeEvent(BYTES); + verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); + assertThat(mailboxEventCollector.getEvents()).containsOnly(event); + } + + @Test + public void renameDistantEventsShouldBeWellHandled() throws Exception { + final MailboxListener.Event event = new MailboxListener.MailboxRenamed(mailboxSession, MAILBOX_PATH) { + @Override + public MailboxPath getNewPath() { + return MAILBOX_PATH_NEW; + } + }; + when(mockedEventSerializer.deSerializeEvent(BYTES)).thenAnswer(new Answer<MailboxListener.Event>() { + @Override + public MailboxListener.Event answer(InvocationOnMock invocation) throws Throwable { + return event; + } + }); + broadcastDelegatingMailboxListener.addListener(MAILBOX_PATH, mailboxEventCollector, mailboxSession); + broadcastDelegatingMailboxListener.receiveSerializedEvent(BYTES); + verify(mockedEventSerializer).deSerializeEvent(BYTES); + verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); + assertThat(mailboxEventCollector.getEvents()).containsOnly(event); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org