Author: cmueller Date: Thu Feb 10 20:51:39 2011 New Revision: 1069564 URL: http://svn.apache.org/viewvc?rev=1069564&view=rev Log: CAMEL-3650: SMSC initiated unbind spawns exponential amounts of reconnect threads
Modified: camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java Modified: camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java?rev=1069564&r1=1069563&r2=1069564&view=diff ============================================================================== --- camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java (original) +++ camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java Thu Feb 10 20:51:39 2011 @@ -17,6 +17,7 @@ package org.apache.camel.component.smpp; import java.io.IOException; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -59,6 +60,7 @@ public class SmppConsumer extends Defaul private SMPPSession session; private MessageReceiverListener messageReceiverListener; private SessionStateListener sessionStateListener; + private final ReentrantLock reconnectLock = new ReentrantLock(); /** * The constructor which gets a smpp endpoint, a smpp configuration and a @@ -198,32 +200,44 @@ public class SmppConsumer extends Defaul } private void reconnect(final long initialReconnectDelay) { - new Thread() { - @Override - public void run() { - LOG.info("Schedule reconnect after " + initialReconnectDelay + " millis"); - try { - Thread.sleep(initialReconnectDelay); - } catch (InterruptedException e) { - } - - int attempt = 0; - while (!(isStopping() || isStopped()) && (session == null || session.getSessionState().equals(SessionState.CLOSED))) { + if (reconnectLock.tryLock()) { + new Thread() { + @Override + public void run() { try { - LOG.info("Trying to reconnect to " + getEndpoint().getConnectionString() + " - attempt #" + (++attempt) + "..."); - session = createSession(); - } catch (IOException e) { - LOG.info("Failed to reconnect to " + getEndpoint().getConnectionString()); - closeSession(session); + boolean reconnected = false; + + LOG.info("Schedule reconnect after " + initialReconnectDelay + " millis"); try { - Thread.sleep(configuration.getReconnectDelay()); - } catch (InterruptedException ee) { + Thread.sleep(initialReconnectDelay); + } catch (InterruptedException e) { + } + + int attempt = 0; + while (!(isStopping() || isStopped()) && (session == null || session.getSessionState().equals(SessionState.CLOSED))) { + try { + LOG.info("Trying to reconnect to " + getEndpoint().getConnectionString() + " - attempt #" + (++attempt) + "..."); + session = createSession(); + reconnected = true; + } catch (IOException e) { + LOG.info("Failed to reconnect to " + getEndpoint().getConnectionString()); + closeSession(session); + try { + Thread.sleep(configuration.getReconnectDelay()); + } catch (InterruptedException ee) { + } + } + } + + if (reconnected) { + LOG.info("Reconnected to " + getEndpoint().getConnectionString()); } + } finally { + reconnectLock.unlock(); } } - LOG.info("Reconnected to " + getEndpoint().getConnectionString()); - } - }.start(); + }.start(); + } } @Override