Author: cmueller
Date: Thu Feb 10 20:51:39 2011
New Revision: 1069564

URL: http://svn.apache.org/viewvc?rev=1069564&view=rev
Log:
CAMEL-3650: SMSC initiated unbind spawns exponential amounts of reconnect 
threads

Modified:
    
camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java

Modified: 
camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java?rev=1069564&r1=1069563&r2=1069564&view=diff
==============================================================================
--- 
camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
 (original)
+++ 
camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
 Thu Feb 10 20:51:39 2011
@@ -17,6 +17,7 @@
 package org.apache.camel.component.smpp;
 
 import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -59,6 +60,7 @@ public class SmppConsumer extends Defaul
     private SMPPSession session;
     private MessageReceiverListener messageReceiverListener;
     private SessionStateListener sessionStateListener;
+    private final ReentrantLock reconnectLock = new ReentrantLock();
 
     /**
      * The constructor which gets a smpp endpoint, a smpp configuration and a
@@ -198,32 +200,44 @@ public class SmppConsumer extends Defaul
     }
 
     private void reconnect(final long initialReconnectDelay) {
-        new Thread() {
-            @Override
-            public void run() {
-                LOG.info("Schedule reconnect after " + initialReconnectDelay + 
" millis");
-                try {
-                    Thread.sleep(initialReconnectDelay);
-                } catch (InterruptedException e) {
-                }
-
-                int attempt = 0;
-                while (!(isStopping() || isStopped()) && (session == null || 
session.getSessionState().equals(SessionState.CLOSED))) {
+        if (reconnectLock.tryLock()) {
+            new Thread() {
+                @Override
+                public void run() {
                     try {
-                        LOG.info("Trying to reconnect to " + 
getEndpoint().getConnectionString() + " - attempt #" + (++attempt) + "...");
-                        session = createSession();
-                    } catch (IOException e) {
-                        LOG.info("Failed to reconnect to " + 
getEndpoint().getConnectionString());
-                        closeSession(session);
+                        boolean reconnected = false;
+                        
+                        LOG.info("Schedule reconnect after " + 
initialReconnectDelay + " millis");
                         try {
-                            Thread.sleep(configuration.getReconnectDelay());
-                        } catch (InterruptedException ee) {
+                            Thread.sleep(initialReconnectDelay);
+                        } catch (InterruptedException e) {
+                        }
+
+                        int attempt = 0;
+                        while (!(isStopping() || isStopped()) && (session == 
null || session.getSessionState().equals(SessionState.CLOSED))) {
+                            try {
+                                LOG.info("Trying to reconnect to " + 
getEndpoint().getConnectionString() + " - attempt #" + (++attempt) + "...");
+                                session = createSession();
+                                reconnected = true;
+                            } catch (IOException e) {
+                                LOG.info("Failed to reconnect to " + 
getEndpoint().getConnectionString());
+                                closeSession(session);
+                                try {
+                                    
Thread.sleep(configuration.getReconnectDelay());
+                                } catch (InterruptedException ee) {
+                                }
+                            }
+                        }
+                        
+                        if (reconnected) {
+                            LOG.info("Reconnected to " + 
getEndpoint().getConnectionString());                        
                         }
+                    } finally {
+                        reconnectLock.unlock();
                     }
                 }
-                LOG.info("Reconnected to " + 
getEndpoint().getConnectionString());
-            }
-        }.start();
+            }.start();            
+        }
     }
 
     @Override


Reply via email to