This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 6a8d9924e51 CAMEL-20121 reconnect SMPP session after receiving Unbound (#12046) 6a8d9924e51 is described below commit 6a8d9924e51a4e7c370e51211de7f13ff4dd3f8d Author: Christian Ambach <der-a...@users.noreply.github.com> AuthorDate: Wed Jan 17 07:33:41 2024 +0100 CAMEL-20121 reconnect SMPP session after receiving Unbound (#12046) * update version of jsmpp to 3.0.1 Signed-off-by: Christian Ambach <a...@samba.org> * CAMEL-20121 reconnect SMPP session after receiving Unbind Signed-off-by: Christian Ambach <a...@samba.org> --------- Signed-off-by: Christian Ambach <a...@samba.org> --- .../apache/camel/component/smpp/SmppConsumer.java | 7 ++-- .../apache/camel/component/smpp/SmppProducer.java | 6 ++- .../camel/component/smpp/SmppConsumerTest.java | 45 +++++++++++++++++++++ .../camel/component/smpp/SmppProducerTest.java | 47 ++++++++++++++++++++++ 4 files changed, 100 insertions(+), 5 deletions(-) diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java index 6f35a09f6d5..d2502aa163f 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java @@ -75,10 +75,11 @@ public class SmppConsumer extends DefaultConsumer { configuration.getSessionStateListener().onStateChange(newState, oldState, source); } - if (newState.equals(SessionState.CLOSED)) { - LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString()); + if (newState.equals(SessionState.UNBOUND) || newState.equals(SessionState.CLOSED)) { + LOG.warn(newState.equals(SessionState.UNBOUND) + ? "Session to {} was unbound - trying to reconnect" : "Lost connection to: {} - trying to reconnect...", + getEndpoint().getConnectionString()); closeSession(); - reconnect(configuration.getInitialReconnectDelay()); } }; diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java index 00e304eaa46..b8841fad205 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java @@ -71,8 +71,10 @@ public class SmppProducer extends DefaultProducer { configuration.getSessionStateListener().onStateChange(newState, oldState, source); } - if (newState.equals(SessionState.CLOSED)) { - LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString()); + if (newState.equals(SessionState.UNBOUND) || newState.equals(SessionState.CLOSED)) { + LOG.warn(newState.equals(SessionState.UNBOUND) + ? "Session to {} was unbound - trying to reconnect" : "Lost connection to: {} - trying to reconnect...", + getEndpoint().getConnectionString()); closeSession(); reconnect(configuration.getInitialReconnectDelay()); } diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java index e659fc917b8..9ff39852988 100644 --- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java +++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java @@ -16,24 +16,37 @@ */ package org.apache.camel.component.smpp; +import java.util.concurrent.ScheduledExecutorService; + import org.apache.camel.CamelContext; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.support.task.BackgroundTask; +import org.apache.camel.support.task.budget.Budgets; +import org.apache.camel.util.ReflectionHelper; import org.jsmpp.bean.BindType; import org.jsmpp.bean.NumberingPlanIndicator; import org.jsmpp.bean.TypeOfNumber; +import org.jsmpp.extra.SessionState; import org.jsmpp.session.BindParameter; import org.jsmpp.session.MessageReceiverListener; import org.jsmpp.session.SMPPSession; import org.jsmpp.session.SessionStateListener; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.MockedStatic; import static org.junit.jupiter.api.Assertions.assertSame; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -156,4 +169,36 @@ public class SmppConsumerTest { assertSame(endpoint, consumer.getEndpoint()); assertSame(configuration, consumer.getConfiguration()); } + + @ParameterizedTest + @EnumSource(value = SessionState.class, names = { "UNBOUND", "CLOSED" }) + public void internalSessionStateListenerShouldCloseSessionAndReconnect(SessionState sessionState) throws Exception { + try (MockedStatic<SmppUtils> smppUtilsMock = mockStatic(SmppUtils.class)) { + SessionStateListener sessionStateListener = (SessionStateListener) ReflectionHelper + .getField(SmppConsumer.class.getDeclaredField("internalSessionStateListener"), consumer); + ScheduledExecutorService reconnectService = (ScheduledExecutorService) ReflectionHelper + .getField(SmppConsumer.class.getDeclaredField("reconnectService"), consumer); + when(endpoint.getConnectionString()) + .thenReturn("smpp://smppclient@localhost:2775"); + BindParameter expectedBindParameter = new BindParameter( + BindType.BIND_RX, + "smppclient", + "password", + "cp", + TypeOfNumber.UNKNOWN, + NumberingPlanIndicator.UNKNOWN, + ""); + when(session.connectAndBind("localhost", Integer.valueOf(2775), expectedBindParameter)) + .thenReturn("1"); + smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), anyString(), anyLong(), anyLong(), anyInt())) + .thenReturn(new BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService) + .withBudget(Budgets.timeBudget().build()).build()); + + consumer.doStart(); + + sessionStateListener.onStateChange(sessionState, SessionState.BOUND_RX, null); + verify(session).unbindAndClose(); + } + } + } diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java index ebf5b0491b0..d855a136966 100644 --- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java +++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java @@ -16,20 +16,34 @@ */ package org.apache.camel.component.smpp; +import java.util.concurrent.ScheduledExecutorService; + import org.apache.camel.Exchange; +import org.apache.camel.support.task.BackgroundTask; +import org.apache.camel.support.task.budget.Budgets; +import org.apache.camel.util.ReflectionHelper; import org.jsmpp.bean.BindType; import org.jsmpp.bean.InterfaceVersion; import org.jsmpp.bean.NumberingPlanIndicator; import org.jsmpp.bean.TypeOfNumber; +import org.jsmpp.extra.SessionState; import org.jsmpp.session.BindParameter; import org.jsmpp.session.SMPPSession; import org.jsmpp.session.SessionStateListener; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.MockedStatic; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -126,4 +140,37 @@ public class SmppProducerTest { assertSame(endpoint, producer.getEndpoint()); assertSame(configuration, producer.getConfiguration()); } + + @ParameterizedTest + @EnumSource(value = SessionState.class, names = { "UNBOUND", "CLOSED" }) + public void internalSessionStateListenerShouldCloseSessionAndReconnect(SessionState sessionState) throws Exception { + try (MockedStatic<SmppUtils> smppUtilsMock = mockStatic(SmppUtils.class)) { + ScheduledExecutorService reconnectService = (ScheduledExecutorService) ReflectionHelper + .getField(SmppProducer.class.getDeclaredField("reconnectService"), producer); + SessionStateListener sessionStateListener = (SessionStateListener) ReflectionHelper + .getField(SmppProducer.class.getDeclaredField("internalSessionStateListener"), producer); + when(endpoint.getConnectionString()) + .thenReturn("smpp://smppclient@localhost:2775"); + BindParameter expectedBindParameters = new BindParameter( + BindType.BIND_TX, + "smppclient", + "password", + "cp", + TypeOfNumber.UNKNOWN, + NumberingPlanIndicator.UNKNOWN, + "", + InterfaceVersion.IF_50); + when(session.connectAndBind("localhost", Integer.valueOf(2775), expectedBindParameters)) + .thenReturn("1"); + when(endpoint.isSingleton()).thenReturn(true); + smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), anyString(), anyLong(), anyLong(), anyInt())) + .thenReturn(new BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService) + .withBudget(Budgets.timeBudget().build()).build()); + + producer.doStart(); + + sessionStateListener.onStateChange(SessionState.CLOSED, SessionState.BOUND_TX, null); + verify(session).unbindAndClose(); + } + } }