This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a8d9924e51 CAMEL-20121 reconnect SMPP session after receiving Unbound 
(#12046)
6a8d9924e51 is described below

commit 6a8d9924e51a4e7c370e51211de7f13ff4dd3f8d
Author: Christian Ambach <der-a...@users.noreply.github.com>
AuthorDate: Wed Jan 17 07:33:41 2024 +0100

    CAMEL-20121 reconnect SMPP session after receiving Unbound (#12046)
    
    * update version of jsmpp to 3.0.1
    
    Signed-off-by: Christian Ambach <a...@samba.org>
    
    * CAMEL-20121 reconnect SMPP session after receiving Unbind
    
    Signed-off-by: Christian Ambach <a...@samba.org>
    
    ---------
    
    Signed-off-by: Christian Ambach <a...@samba.org>
---
 .../apache/camel/component/smpp/SmppConsumer.java  |  7 ++--
 .../apache/camel/component/smpp/SmppProducer.java  |  6 ++-
 .../camel/component/smpp/SmppConsumerTest.java     | 45 +++++++++++++++++++++
 .../camel/component/smpp/SmppProducerTest.java     | 47 ++++++++++++++++++++++
 4 files changed, 100 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
index 6f35a09f6d5..d2502aa163f 100644
--- 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
+++ 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
@@ -75,10 +75,11 @@ public class SmppConsumer extends DefaultConsumer {
                 
configuration.getSessionStateListener().onStateChange(newState, oldState, 
source);
             }
 
-            if (newState.equals(SessionState.CLOSED)) {
-                LOG.warn("Lost connection to: {} - trying to reconnect...", 
getEndpoint().getConnectionString());
+            if (newState.equals(SessionState.UNBOUND) || 
newState.equals(SessionState.CLOSED)) {
+                LOG.warn(newState.equals(SessionState.UNBOUND)
+                        ? "Session to {} was unbound - trying to reconnect" : 
"Lost connection to: {} - trying to reconnect...",
+                        getEndpoint().getConnectionString());
                 closeSession();
-
                 reconnect(configuration.getInitialReconnectDelay());
             }
         };
diff --git 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
index 00e304eaa46..b8841fad205 100644
--- 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
+++ 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
@@ -71,8 +71,10 @@ public class SmppProducer extends DefaultProducer {
                 
configuration.getSessionStateListener().onStateChange(newState, oldState, 
source);
             }
 
-            if (newState.equals(SessionState.CLOSED)) {
-                LOG.warn("Lost connection to: {} - trying to reconnect...", 
getEndpoint().getConnectionString());
+            if (newState.equals(SessionState.UNBOUND) || 
newState.equals(SessionState.CLOSED)) {
+                LOG.warn(newState.equals(SessionState.UNBOUND)
+                        ? "Session to {} was unbound - trying to reconnect" : 
"Lost connection to: {} - trying to reconnect...",
+                        getEndpoint().getConnectionString());
                 closeSession();
                 reconnect(configuration.getInitialReconnectDelay());
             }
diff --git 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
index e659fc917b8..9ff39852988 100644
--- 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
+++ 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
@@ -16,24 +16,37 @@
  */
 package org.apache.camel.component.smpp;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.support.task.BackgroundTask;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.util.ReflectionHelper;
 import org.jsmpp.bean.BindType;
 import org.jsmpp.bean.NumberingPlanIndicator;
 import org.jsmpp.bean.TypeOfNumber;
+import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.BindParameter;
 import org.jsmpp.session.MessageReceiverListener;
 import org.jsmpp.session.SMPPSession;
 import org.jsmpp.session.SessionStateListener;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.MockedStatic;
 
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -156,4 +169,36 @@ public class SmppConsumerTest {
         assertSame(endpoint, consumer.getEndpoint());
         assertSame(configuration, consumer.getConfiguration());
     }
+
+    @ParameterizedTest
+    @EnumSource(value = SessionState.class, names = { "UNBOUND", "CLOSED" })
+    public void 
internalSessionStateListenerShouldCloseSessionAndReconnect(SessionState 
sessionState) throws Exception {
+        try (MockedStatic<SmppUtils> smppUtilsMock = 
mockStatic(SmppUtils.class)) {
+            SessionStateListener sessionStateListener = (SessionStateListener) 
ReflectionHelper
+                    
.getField(SmppConsumer.class.getDeclaredField("internalSessionStateListener"), 
consumer);
+            ScheduledExecutorService reconnectService = 
(ScheduledExecutorService) ReflectionHelper
+                    
.getField(SmppConsumer.class.getDeclaredField("reconnectService"), consumer);
+            when(endpoint.getConnectionString())
+                    .thenReturn("smpp://smppclient@localhost:2775");
+            BindParameter expectedBindParameter = new BindParameter(
+                    BindType.BIND_RX,
+                    "smppclient",
+                    "password",
+                    "cp",
+                    TypeOfNumber.UNKNOWN,
+                    NumberingPlanIndicator.UNKNOWN,
+                    "");
+            when(session.connectAndBind("localhost", Integer.valueOf(2775), 
expectedBindParameter))
+                    .thenReturn("1");
+            smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), 
anyString(), anyLong(), anyLong(), anyInt()))
+                    .thenReturn(new 
BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService)
+                            .withBudget(Budgets.timeBudget().build()).build());
+
+            consumer.doStart();
+
+            sessionStateListener.onStateChange(sessionState, 
SessionState.BOUND_RX, null);
+            verify(session).unbindAndClose();
+        }
+    }
+
 }
diff --git 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java
 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java
index ebf5b0491b0..d855a136966 100644
--- 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java
+++ 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java
@@ -16,20 +16,34 @@
  */
 package org.apache.camel.component.smpp;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.camel.Exchange;
+import org.apache.camel.support.task.BackgroundTask;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.util.ReflectionHelper;
 import org.jsmpp.bean.BindType;
 import org.jsmpp.bean.InterfaceVersion;
 import org.jsmpp.bean.NumberingPlanIndicator;
 import org.jsmpp.bean.TypeOfNumber;
+import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.BindParameter;
 import org.jsmpp.session.SMPPSession;
 import org.jsmpp.session.SessionStateListener;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.MockedStatic;
 
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -126,4 +140,37 @@ public class SmppProducerTest {
         assertSame(endpoint, producer.getEndpoint());
         assertSame(configuration, producer.getConfiguration());
     }
+
+    @ParameterizedTest
+    @EnumSource(value = SessionState.class, names = { "UNBOUND", "CLOSED" })
+    public void 
internalSessionStateListenerShouldCloseSessionAndReconnect(SessionState 
sessionState) throws Exception {
+        try (MockedStatic<SmppUtils> smppUtilsMock = 
mockStatic(SmppUtils.class)) {
+            ScheduledExecutorService reconnectService = 
(ScheduledExecutorService) ReflectionHelper
+                    
.getField(SmppProducer.class.getDeclaredField("reconnectService"), producer);
+            SessionStateListener sessionStateListener = (SessionStateListener) 
ReflectionHelper
+                    
.getField(SmppProducer.class.getDeclaredField("internalSessionStateListener"), 
producer);
+            when(endpoint.getConnectionString())
+                    .thenReturn("smpp://smppclient@localhost:2775");
+            BindParameter expectedBindParameters = new BindParameter(
+                    BindType.BIND_TX,
+                    "smppclient",
+                    "password",
+                    "cp",
+                    TypeOfNumber.UNKNOWN,
+                    NumberingPlanIndicator.UNKNOWN,
+                    "",
+                    InterfaceVersion.IF_50);
+            when(session.connectAndBind("localhost", Integer.valueOf(2775), 
expectedBindParameters))
+                    .thenReturn("1");
+            when(endpoint.isSingleton()).thenReturn(true);
+            smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), 
anyString(), anyLong(), anyLong(), anyInt()))
+                    .thenReturn(new 
BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService)
+                            .withBudget(Budgets.timeBudget().build()).build());
+
+            producer.doStart();
+
+            sessionStateListener.onStateChange(SessionState.CLOSED, 
SessionState.BOUND_TX, null);
+            verify(session).unbindAndClose();
+        }
+    }
 }

Reply via email to