Author: rajikak
Date: Thu Aug 1 22:03:03 2013
New Revision: 1509452
URL: http://svn.apache.org/r1509452
Log:
added initial ha implementations.
Added:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHABrokerEntry.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHAEntry.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportBuffers.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactory.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPTransportUtilsTest.java
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportBuffers.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportBuffers.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportBuffers.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportBuffers.java
Thu Aug 1 22:03:03 2013
@@ -94,7 +94,7 @@ public class AMQPTransportBuffers {
// block if there is no messages
return requestBuffer.take();
} catch (InterruptedException e) {
- // ignore
+ Thread.currentThread().interrupt();
}
return null;
}
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java
Thu Aug 1 22:03:03 2013
@@ -164,29 +164,29 @@ public final class AMQPTransportConstant
* If a polling task encounter an exception due to some reason(most
probably due to broker
* outage) the number of milliseconds it should be suspended before next
re-try.
*/
- public static final String PARAMETER_INITIAL_RE_CONNECTION_DURATION =
- "transport.amqp.InitialReconnectDuration";
+ public static final String PARAM_INITIAL_RE_CONNECTION_DURATION =
+ "initial-reconnect-duration";
/**
* If the polling task fails again after the initial re-connection duration
- * {@link AMQPTransportConstant#PARAMETER_INITIAL_RE_CONNECTION_DURATION}
+ * {@link AMQPTransportConstant#PARAM_INITIAL_RE_CONNECTION_DURATION}
* next suspend duration will be calculated using this
- * (PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR *
PARAMETER_INITIAL_RE_CONNECTION_DURATION).
+ * (PARAM_RE_CONNECTION_PROGRESSION_FACTOR *
PARAM_INITIAL_RE_CONNECTION_DURATION).
*/
- public static final String PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR =
- "transport.amqp.ReconnectionProgressionFactor";
+ public static final String PARAM_RE_CONNECTION_PROGRESSION_FACTOR =
+ "reconnection-progression-factor";
/**
* The maximum duration to suspend the polling task in case of an error.
The current suspend
* duration will reach this
* value by following the series,
- * PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR *
PARAMETER_INITIAL_RE_CONNECTION_DURATION.
+ * PARAM_RE_CONNECTION_PROGRESSION_FACTOR *
PARAM_INITIAL_RE_CONNECTION_DURATION.
* This upper bound is there
* because nobody wants to wait a long time until the next re-try if the
broker is alive.
*/
- public static final String PARAMETER_MAX_RE_CONNECTION_DURATION =
- "transport.amqp.MaximumReconnectionDuration";
+ public static final String PARAM_MAX_RE_CONNECTION_DURATION =
+ "maximum-reconnection-duration";
/**
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
Thu Aug 1 22:03:03 2013
@@ -17,6 +17,7 @@ import org.apache.axis2.AxisFault;
import org.apache.axis2.transport.base.AbstractTransportListenerEx;
import
org.apache.synapse.transport.amqp.connectionfactory.AMQPTransportConnectionFactory;
import
org.apache.synapse.transport.amqp.connectionfactory.AMQPTransportConnectionFactoryManager;
+import org.apache.synapse.transport.amqp.ha.AMQPTransportReconnectHandler;
import org.apache.synapse.transport.amqp.pollingtask.AMQPTransportPollingTask;
import java.util.concurrent.ExecutorService;
@@ -40,6 +41,8 @@ public class AMQPTransportListener exten
private ExecutorService connectionFactoryES;
+ private AMQPTransportReconnectHandler haHandler;
+
@Override
protected void doInit() throws AxisFault {
@@ -58,6 +61,25 @@ public class AMQPTransportListener exten
AMQPTransportUtils.getIntProperty(AMQPTransportConstant.PARAM_WORKER_POOL_SIZE,
AMQPTransportConstant.WORKER_POOL_DEFAULT));
+
+ int initialReconnectDuration = AMQPTransportUtils.getIntProperty(
+ AMQPTransportConstant.PARAM_INITIAL_RE_CONNECTION_DURATION,
1000);
+
+ double reconnectionProgressionFactor =
AMQPTransportUtils.getDoubleProperty(
+ AMQPTransportConstant.PARAM_RE_CONNECTION_PROGRESSION_FACTOR,
2.0);
+
+ int maxReconnectionDuration = AMQPTransportUtils.getIntProperty(
+ AMQPTransportConstant.PARAM_MAX_RE_CONNECTION_DURATION, 1000 *
60 * 10);
+
+ haHandler = new AMQPTransportReconnectHandler(
+ connectionFactoryES,
+ maxReconnectionDuration,
+ reconnectionProgressionFactor,
+ initialReconnectDuration,
+ connectionFactoryManager);
+
+ new Thread(haHandler, "AMQP-HA-handler-task").start();
+
log.info("AMQP transport listener initializing..");
}
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactory.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactory.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactory.java
Thu Aug 1 22:03:03 2013
@@ -26,7 +26,10 @@ import org.apache.synapse.transport.amqp
import org.apache.synapse.transport.amqp.AMQPTransportUtils;
import java.io.IOException;
-import java.util.Hashtable;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -65,7 +68,7 @@ public class AMQPTransportConnectionFact
/**
* The list of parameters(see above) in the connection factory definition.
*/
- private Hashtable<String, String> parameters = new Hashtable<String,
String>();
+ private Map<String, String> parameters = new HashMap<String, String>();
/**
* The AMQP connection to the broker maintain per connection factory.
@@ -77,6 +80,21 @@ public class AMQPTransportConnectionFact
*/
private Channel channel = null;
+ public AMQPTransportConnectionFactory(
+ Map<String, String> parameters,
+ ExecutorService es)
+ throws AMQPTransportException {
+ try {
+ connection = createConnection(es, parameters);
+ channel = createChannel(connection, parameters);
+ } catch (Exception e) {
+ String msg = "Could not initialize the connection factory with
parameters\n";
+ for (Map.Entry entry : parameters.entrySet()) {
+ msg = msg + entry.getKey() + ":" + entry.getValue() + "\n";
+ }
+ throw new AMQPTransportException(msg, e);
+ }
+ }
public AMQPTransportConnectionFactory(Parameter parameter, ExecutorService
es)
throws AMQPTransportException {
@@ -87,7 +105,7 @@ public class AMQPTransportConnectionFact
if (!(parameter.getValue() instanceof OMElement)) {
throw new AMQPTransportException("The connection factory '" +
parameter.getName() +
- "' is in valid. It's required to have the least
connection factory definition with '" +
+ "' is invalid. It's required to have the least
connection factory definition with '" +
AMQPTransportConstant.PARAMETER_CONNECTION_URI + "'
parameter. Example: \n" +
"\n<transportReceiver name=\"amqp\"
class=\"org.wso2.carbon.transports.amqp.AMQPTransportListener\">\n" +
" <parameter name=\"default\" locked=\"false\">\n" +
@@ -106,60 +124,8 @@ public class AMQPTransportConnectionFact
parameters.put(entry.getName(), (String) entry.getValue());
}
- ConnectionFactory connectionFactory = new ConnectionFactory();
-
connectionFactory.setUri(parameters.get(AMQPTransportConstant.PARAMETER_CONNECTION_URI));
-
- if (parameters.get(AMQPTransportConstant.PARAMETER_BROKER_LIST) !=
null) {
- Address[] addresses = AMQPTransportUtils.getAddressArray(
-
parameters.get(AMQPTransportConstant.PARAMETER_BROKER_LIST), ",", ':');
- connection = connectionFactory.newConnection(es, addresses);
- } else {
- connection = connectionFactory.newConnection(es);
- }
-
- if
(parameters.get(AMQPTransportConstant.PARAMETER_AMQP_CHANNEL_NUMBER) != null) {
- int index = 0;
- try {
- index = Integer.parseInt(parameters.get(
-
AMQPTransportConstant.PARAMETER_AMQP_CHANNEL_NUMBER));
- } catch (NumberFormatException e) {
- index = 1; // assume default,
- // fair dispatch see
http://www.rabbitmq.com/tutorials/tutorial-two-java.html
- }
- channel = connection.createChannel(index);
-
- } else {
- channel = connection.createChannel();
- }
-
-
- int prefetchSize = 1024;
- if
(parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_SIZE) != null)
{
- try {
- prefetchSize = Integer.parseInt(
-
parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_SIZE));
- } catch (NumberFormatException e) {
- prefetchSize = 1024; // assume default
- }
- }
-
- int prefetchCount = 0;
- if
(parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_COUNT) !=
null) {
- try {
- prefetchCount = Integer.parseInt(
-
parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_COUNT));
- channel.basicQos(prefetchCount);
- } catch (NumberFormatException e) {
- prefetchCount = 0; // assume default
- }
- }
-
- boolean useGlobally = false;
- if
(parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_QOS_GLOBAL) != null) {
- useGlobally = Boolean.parseBoolean(parameters.get(
- AMQPTransportConstant.PARAMETER_CHANNEL_QOS_GLOBAL));
- }
-
+ connection = createConnection(es, parameters);
+ channel = createChannel(connection, parameters);
} catch (Exception e) {
throw new AMQPTransportException("" +
@@ -199,6 +165,14 @@ public class AMQPTransportConnectionFact
}
/**
+ * Get the connection
+ * @return the connection to broker.
+ */
+ public Connection getConnection() {
+ return connection;
+ }
+
+ /**
* Return the name of this connection factory(the name given in axis2.xml)
*
* @return name of this connection factory
@@ -225,4 +199,64 @@ public class AMQPTransportConnectionFact
public Map<String, String> getParameters() {
return parameters;
}
-}
+
+ private Connection createConnection(ExecutorService es, Map<String,
String> parameters)
+ throws IOException, URISyntaxException, NoSuchAlgorithmException,
KeyManagementException {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+
connectionFactory.setUri(parameters.get(AMQPTransportConstant.PARAMETER_CONNECTION_URI));
+
+ if (parameters.get(AMQPTransportConstant.PARAMETER_BROKER_LIST) !=
null) {
+ Address[] addresses = AMQPTransportUtils.getAddressArray(
+
parameters.get(AMQPTransportConstant.PARAMETER_BROKER_LIST), ",", ':');
+ return connectionFactory.newConnection(es, addresses);
+ }
+ return connectionFactory.newConnection(es);
+ }
+
+ private Channel createChannel(Connection connection, Map<String, String>
parameters)
+ throws IOException {
+ Channel ch;
+ if
(parameters.get(AMQPTransportConstant.PARAMETER_AMQP_CHANNEL_NUMBER) != null) {
+ int index = 0;
+ try {
+ index = Integer.parseInt(parameters.get(
+ AMQPTransportConstant.PARAMETER_AMQP_CHANNEL_NUMBER));
+ } catch (NumberFormatException e) {
+ index = 1; // assume default,
+ // fair dispatch see
http://www.rabbitmq.com/tutorials/tutorial-two-java.html
+ }
+ ch = connection.createChannel(index);
+
+ } else {
+ ch = connection.createChannel();
+ }
+
+ int prefetchSize = 1024;
+ if
(parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_SIZE) != null)
{
+ try {
+ prefetchSize = Integer.parseInt(
+
parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_SIZE));
+ } catch (NumberFormatException e) {
+ prefetchSize = 1024; // assume default
+ }
+ }
+
+ int prefetchCount = 0;
+ if
(parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_COUNT) !=
null) {
+ try {
+ prefetchCount = Integer.parseInt(
+
parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_PREFETCH_COUNT));
+ ch.basicQos(prefetchCount);
+ } catch (NumberFormatException e) {
+ prefetchCount = 0; // assume default
+ }
+ }
+
+ boolean useGlobally = false;
+ if (parameters.get(AMQPTransportConstant.PARAMETER_CHANNEL_QOS_GLOBAL)
!= null) {
+ useGlobally = Boolean.parseBoolean(parameters.get(
+ AMQPTransportConstant.PARAMETER_CHANNEL_QOS_GLOBAL));
+ }
+ return ch;
+ }
+}
\ No newline at end of file
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
Thu Aug 1 22:03:03 2013
@@ -41,8 +41,8 @@ public class AMQPTransportConnectionFact
* Add the list of defined connection factories definition.
*
* @param transportInDescription The connection factory definition in
axis2.xml
- * @param es An instance of java.util.concurrent.ExecutorService to use
with AMQP connection
- * factory
+ * @param es An instance of
java.util.concurrent.ExecutorService to use with AMQP connection
+ * factory
*/
public void addConnectionFactories(ParameterInclude
transportInDescription, ExecutorService es) {
for (Parameter p : transportInDescription.getParameters()) {
@@ -63,6 +63,10 @@ public class AMQPTransportConnectionFact
factories.put(parameter.getName(), new
AMQPTransportConnectionFactory(parameter, es));
}
+ public void addConnectionFactory(String name,
AMQPTransportConnectionFactory cf) {
+ factories.put(name, cf);
+ }
+
/**
* Get the connection factory with this name.
*
@@ -112,4 +116,9 @@ public class AMQPTransportConnectionFact
throw new AMQPTransportException("Error occurred whiling shutting
down connections", e);
}
}
+
+ public ConcurrentHashMap<String, AMQPTransportConnectionFactory>
getAllFactories() {
+ return factories;
+ }
+
}
Added:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHABrokerEntry.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHABrokerEntry.java?rev=1509452&view=auto
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHABrokerEntry.java
(added)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHABrokerEntry.java
Thu Aug 1 22:03:03 2013
@@ -0,0 +1,51 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.synapse.transport.amqp.ha;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+
+public class AMQPTransportHABrokerEntry {
+
+ private Channel channel;
+
+ private Connection connection;
+
+ public AMQPTransportHABrokerEntry(Channel channel, Connection connection) {
+ this.channel = channel;
+ this.connection = connection;
+ }
+
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+}
Added:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHAEntry.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHAEntry.java?rev=1509452&view=auto
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHAEntry.java
(added)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportHAEntry.java
Thu Aug 1 22:03:03 2013
@@ -0,0 +1,56 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.synapse.transport.amqp.ha;
+
+import java.util.concurrent.Semaphore;
+
+public class AMQPTransportHAEntry {
+
+ private Semaphore lock;
+
+ private String key;
+
+ private String connFacName;
+
+ public AMQPTransportHAEntry(Semaphore lock, String key, String
connFacName) {
+ this.lock = lock;
+ this.key = key;
+ this.connFacName = connFacName;
+ }
+
+ public Semaphore getLock() {
+ return lock;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getConnectionFactoryName() {
+ return connFacName;
+ }
+}
Added:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java?rev=1509452&view=auto
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
(added)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
Thu Aug 1 22:03:03 2013
@@ -0,0 +1,148 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.synapse.transport.amqp.ha;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.transport.amqp.AMQPTransportException;
+import
org.apache.synapse.transport.amqp.connectionfactory.AMQPTransportConnectionFactory;
+import
org.apache.synapse.transport.amqp.connectionfactory.AMQPTransportConnectionFactoryManager;
+
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * Responsible for handling the shutdown signals gracefully. For example
+ * this provides the functionality for reconnecting to broker if broker
+ * when offline. The reconnection attempts happens in exponential back-off
+ * fashion.
+ */
+public class AMQPTransportReconnectHandler implements Runnable {
+
+ private BlockingQueue<AMQPTransportHAEntry> blockedTasks =
+ new LinkedBlockingQueue<AMQPTransportHAEntry>();
+
+ private ConcurrentMap<String, AMQPTransportHABrokerEntry> connectionMap =
+ new ConcurrentHashMap<String, AMQPTransportHABrokerEntry>();
+
+ private AMQPTransportConnectionFactoryManager connectionFactoryManager;
+
+ private int initialReconnectDuration = 1000;
+
+ private double reconnectionProgressionFactor = 2.0;
+
+ private int maxReconnectionDuration = 1000 * 60 * 10;
+
+ private ExecutorService es;
+
+ public AMQPTransportReconnectHandler(ExecutorService es,
+ int maxReconnectionDuration,
+ double reconnectionProgressionFactor,
+ int initialReconnectDuration,
+ AMQPTransportConnectionFactoryManager
+ connectionFactoryManager) {
+ this.es = es;
+ this.maxReconnectionDuration = maxReconnectionDuration;
+ this.reconnectionProgressionFactor = reconnectionProgressionFactor;
+ this.initialReconnectDuration = initialReconnectDuration;
+ this.connectionFactoryManager = connectionFactoryManager;
+ }
+
+ private static Log log =
LogFactory.getLog(AMQPTransportReconnectHandler.class);
+
+ public void run() {
+ try {
+ AMQPTransportHAEntry entry = blockedTasks.take();
+ if (entry != null) {
+ Map<String, String> params = connectionFactoryManager.
+
getConnectionFactory(entry.getConnectionFactoryName()).getParameters();
+ int count = 1;
+ long retryDuration = initialReconnectDuration;
+
+ while (true) {
+ try {
+ Thread.sleep(initialReconnectDuration);
+ new AMQPTransportConnectionFactory(params, es);
+ log.info("The reconnection attempt '" + count + "' was
successful");
+ break;
+ } catch (AMQPTransportException e) {
+ retryDuration = (long) (retryDuration *
reconnectionProgressionFactor);
+ if (retryDuration > maxReconnectionDuration) {
+ retryDuration = initialReconnectDuration;
+ log.info("The retry duration exceeded the maximum
reconnection duration." +
+ " The retry duration is set to initial
reconnection duration " +
+ "value(" + initialReconnectDuration +
"s)");
+ }
+ log.error("The reconnection attempt number '" +
count++ + "' failed. Next " +
+ "re-try will be after '" + (retryDuration /
1000) + "' seconds");
+ try {
+ Thread.sleep(retryDuration);
+ } catch (InterruptedException ignore) {
+ // we need to block
+ }
+ }
+ }
+
+ ConcurrentHashMap<String, AMQPTransportConnectionFactory>
allFac =
+ connectionFactoryManager.getAllFactories();
+
+ for (Map.Entry me : allFac.entrySet()) {
+ String name = (String) me.getKey();
+ Map<String, String> param =
((AMQPTransportConnectionFactory)
+ me.getValue()).getParameters();
+ connectionFactoryManager.removeConnectionFactory(name);
+ connectionFactoryManager.addConnectionFactory(
+ name, new AMQPTransportConnectionFactory(param,
es));
+ }
+
+ String conFacName = entry.getConnectionFactoryName();
+ AMQPTransportConnectionFactory cf = connectionFactoryManager.
+ getConnectionFactory(conFacName);
+ connectionMap.put(
+ entry.getKey(),
+ new AMQPTransportHABrokerEntry(cf.getChannel(),
cf.getConnection()));
+ entry.getLock().release();
+
+
+ while (blockedTasks.isEmpty()) {
+ entry = blockedTasks.take();
+ conFacName = entry.getConnectionFactoryName();
+ cf = connectionFactoryManager.
+ getConnectionFactory(conFacName);
+ connectionMap.put(
+ entry.getKey(),
+ new AMQPTransportHABrokerEntry(cf.getChannel(),
cf.getConnection()));
+ entry.getLock().release();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (AMQPTransportException e) {
+ log.error("High Availability handler just died!. It's time to
re-start", e);
+ }
+ }
+}
\ No newline at end of file
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
Thu Aug 1 22:03:03 2013
@@ -151,25 +151,6 @@ public class AMQPTransportPollingTask {
private int noOfConcurrentConsumers = 2;
/**
- * Initial duration(in milliseconds) to suspend the polling task in case
of an error.
- * {@link
org.apache.synapse.transport.amqp.AMQPTransportConstant#PARAMETER_INITIAL_RE_CONNECTION_DURATION}.
- */
- private int initialReconnectDuration = 1000;
-
- /**
- * The progression factor for next re-try calculation.
- * {@link AMQPTransportConstant#PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR}
- */
- private double reconnectionFactor = 2.0;
-
- /**
- * The maximum duration to suspend the polling task. This is to make sure
there is an upper
- * bound for the suspending the polling task in case of an error.
- * {@link AMQPTransportConstant#PARAMETER_MAX_RE_CONNECTION_DURATION}
- */
- private int maxReconnectionDuration = 1000 * 60 * 10;
-
- /**
* The name of the connectionFactory this service is bound to.
* {@link AMQPTransportConstant#PARAMETER_CONNECTION_FACTORY_NAME}
*/
@@ -302,18 +283,6 @@ public class AMQPTransportPollingTask {
this.noOfConcurrentConsumers = noOfConcurrentConsumers;
}
- public void setInitialReconnectDuration(int initialReconnectDuration) {
- this.initialReconnectDuration = initialReconnectDuration;
- }
-
- public void setReconnectionFactor(double reconnectionFactor) {
- this.reconnectionFactor = reconnectionFactor;
- }
-
- public void setMaxReconnectionDuration(int maxReconnectionDuration) {
- this.maxReconnectionDuration = maxReconnectionDuration;
- }
-
public void setConnectionFactoryName(String connectionFactoryName) {
this.connectionFactoryName = connectionFactoryName;
}
@@ -394,22 +363,6 @@ public class AMQPTransportPollingTask {
return noOfConcurrentConsumers;
}
- public int getInitialReconnectDuration() {
- return initialReconnectDuration;
- }
-
- public double getReconnectionFactor() {
- return reconnectionFactor;
- }
-
- public int getMaxReconnectionDuration() {
- return maxReconnectionDuration;
- }
-
- public String getConnectionFactoryName() {
- return connectionFactoryName;
- }
-
public TimeUnit getScheduledTaskTimeUnit() {
return scheduledTaskTimeUnit;
}
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
Thu Aug 1 22:03:03 2013
@@ -168,28 +168,6 @@ public class AMQPTransportPollingTaskFac
}
try {
- Integer initialReconectionDuration =
AMQPTransportUtils.getOptionalIntParameter(
-
AMQPTransportConstant.PARAMETER_INITIAL_RE_CONNECTION_DURATION,
- svcParam, conFacParam);
- if (initialReconectionDuration != null) {
- pt.setInitialReconnectDuration(initialReconectionDuration);
- }
- } catch (AMQPTransportException e) {
- throw new AxisFault("Could not assign the initial re-connection
duration", e);
- }
-
- try {
- Integer reconnectionFactor =
AMQPTransportUtils.getOptionalIntParameter(
-
AMQPTransportConstant.PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR,
- svcParam, conFacParam);
- if (reconnectionFactor != null) {
- pt.setReconnectionFactor(reconnectionFactor);
- }
- } catch (AMQPTransportException e) {
- throw new AxisFault("Could not assign reconnection factor", e);
- }
-
- try {
Integer dispatchingTask =
AMQPTransportUtils.getOptionalIntParameter(
AMQPTransportConstant.PARAMETER_DISPATCHING_TASK_SIZE,
svcParam, conFacParam);
@@ -251,9 +229,6 @@ public class AMQPTransportPollingTaskFac
"Is queue restricted: '" + pt.isQueueRestricted() + "'\n" +
"Is queue auto deleted: '" + pt.isQueueAutoDelete() +
"'\n" +
"Is blocking mode: '" + pt.isBlockingMode() + "'\n" +
- "Initial re-connection duration: '" +
pt.getInitialReconnectDuration() + "(ms)'\n" +
- "Re-connection progression factor: '" +
pt.getReconnectionFactor() + "'\n" +
- "Maximum re-connection duration: '" +
pt.getMaxReconnectionDuration() + "'\n" +
"Number of concurrent consumers: '" +
pt.getNoOfConcurrentConsumers() + "'\n" +
"Number of dispatching task: '" +
pt.getNoOfDispatchingTask() + "'");
}
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPTransportUtilsTest.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPTransportUtilsTest.java?rev=1509452&r1=1509451&r2=1509452&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPTransportUtilsTest.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/test/java/org/apache/synapse/tranport/amqp/AMQPTransportUtilsTest.java
Thu Aug 1 22:03:03 2013
@@ -40,7 +40,7 @@ public class AMQPTransportUtilsTest exte
cfMap.put(AMQPTransportConstant.PARAMETER_EXCHANGE_TYPE, "direct");
cfMap.put(AMQPTransportConstant.PARAMETER_QUEUE_DURABLE, "true");
-
cfMap.put(AMQPTransportConstant.PARAMETER_INITIAL_RE_CONNECTION_DURATION, "10");
+ cfMap.put(AMQPTransportConstant.PARAM_INITIAL_RE_CONNECTION_DURATION,
"10");
}
public void testGetStringProperty() throws Exception {
@@ -109,9 +109,9 @@ public class AMQPTransportUtilsTest exte
public void testGetOptionalIntParameter() throws Exception {
assertEquals("Invalid value",
-
Integer.parseInt(cfMap.get(AMQPTransportConstant.PARAMETER_INITIAL_RE_CONNECTION_DURATION)),
+
Integer.parseInt(cfMap.get(AMQPTransportConstant.PARAM_INITIAL_RE_CONNECTION_DURATION)),
AMQPTransportUtils.getOptionalIntParameter(
-
AMQPTransportConstant.PARAMETER_INITIAL_RE_CONNECTION_DURATION, svcMap,
cfMap).intValue());
+
AMQPTransportConstant.PARAM_INITIAL_RE_CONNECTION_DURATION, svcMap,
cfMap).intValue());
assertEquals("Invalid value",
Integer.parseInt(svcMap.get(AMQPTransportConstant.PARAMETER_NO_OF_CONCURRENT_CONSUMERS)),