Author: rajikak
Date: Fri Aug 2 00:27:32 2013
New Revision: 1509495
URL: http://svn.apache.org/r1509495
Log:
added ha implementations.
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportEndpoint.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportEndpoint.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportEndpoint.java?rev=1509495&r1=1509494&r2=1509495&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportEndpoint.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportEndpoint.java
Fri Aug 2 00:27:32 2013
@@ -24,7 +24,6 @@ import org.apache.synapse.transport.amqp
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
/**
@@ -77,7 +76,8 @@ public class AMQPTransportEndpoint exten
service,
workerPool,
this,
- conFac);
+ conFac,
+ transportReceiver.getHaHandler());
} catch (AMQPTransportException e) {
throw new AxisFault("Could not load the AMQP endpoint
configuration, " + e.getMessage(), e);
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java?rev=1509495&r1=1509494&r2=1509495&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportListener.java
Fri Aug 2 00:27:32 2013
@@ -41,7 +41,7 @@ public class AMQPTransportListener exten
private ExecutorService connectionFactoryES;
- private AMQPTransportReconnectHandler haHandler;
+ private AMQPTransportReconnectHandler haHandlerTask;
@Override
protected void doInit() throws AxisFault {
@@ -71,14 +71,14 @@ public class AMQPTransportListener exten
int maxReconnectionDuration = AMQPTransportUtils.getIntProperty(
AMQPTransportConstant.PARAM_MAX_RE_CONNECTION_DURATION, 1000 *
60 * 10);
- haHandler = new AMQPTransportReconnectHandler(
+ haHandlerTask = new AMQPTransportReconnectHandler(
connectionFactoryES,
maxReconnectionDuration,
reconnectionProgressionFactor,
initialReconnectDuration,
connectionFactoryManager);
- new Thread(haHandler, "AMQP-HA-handler-task").start();
+ new Thread(haHandlerTask, "AMQP-HA-handler-task").start();
log.info("AMQP transport listener initializing..");
}
@@ -137,4 +137,8 @@ public class AMQPTransportListener exten
return connectionFactoryManager.getConnectionFactory(
AMQPTransportConstant.DEFAULT_CONNECTION_FACTORY_NAME);
}
+
+ public AMQPTransportReconnectHandler getHaHandler(){
+ return haHandlerTask;
+ }
}
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java?rev=1509495&r1=1509494&r2=1509495&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/connectionfactory/AMQPTransportConnectionFactoryManager.java
Fri Aug 2 00:27:32 2013
@@ -98,8 +98,10 @@ public class AMQPTransportConnectionFact
} catch (IOException e) {
throw new AMQPTransportException("Could not remove the
connection '" + name + "'", e);
}
+ } else {
+ throw new AMQPTransportException("No connection factory found with
the name '"
+ + name + "'");
}
- throw new AMQPTransportException("No connection factory found with the
name '" + name + "'");
}
/**
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java?rev=1509495&r1=1509494&r2=1509495&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/ha/AMQPTransportReconnectHandler.java
Fri Aug 2 00:27:32 2013
@@ -97,7 +97,7 @@ public class AMQPTransportReconnectHandl
" The retry duration is set to initial
reconnection duration " +
"value(" + initialReconnectDuration +
"s)");
}
- log.error("The reconnection attempt number '" +
count++ + "' failed. Next " +
+ log.info("The reconnection attempt number '" + count++
+ "' failed. Next " +
"re-try will be after '" + (retryDuration /
1000) + "' seconds");
try {
Thread.sleep(retryDuration);
@@ -117,6 +117,7 @@ public class AMQPTransportReconnectHandl
connectionFactoryManager.removeConnectionFactory(name);
connectionFactoryManager.addConnectionFactory(
name, new AMQPTransportConnectionFactory(param,
es));
+ log.info("A new connection factory was created for -> '" +
name + "'");
}
String conFacName = entry.getConnectionFactoryName();
@@ -127,7 +128,6 @@ public class AMQPTransportReconnectHandl
new AMQPTransportHABrokerEntry(cf.getChannel(),
cf.getConnection()));
entry.getLock().release();
-
while (blockedTasks.isEmpty()) {
entry = blockedTasks.take();
conFacName = entry.getConnectionFactoryName();
@@ -136,13 +136,23 @@ public class AMQPTransportReconnectHandl
connectionMap.put(
entry.getKey(),
new AMQPTransportHABrokerEntry(cf.getChannel(),
cf.getConnection()));
+ log.info("The task with key '" + entry.getKey() + "' was
combined with a new " +
+ "connection factory");
entry.getLock().release();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (AMQPTransportException e) {
- log.error("High Availability handler just died!. It's time to
re-start", e);
+ log.error("High Availability handler just died!. It's time to
reboot the system.", e);
}
}
+
+ public BlockingQueue<AMQPTransportHAEntry> getBlockedTasks() {
+ return blockedTasks;
+ }
+
+ public ConcurrentMap<String, AMQPTransportHABrokerEntry>
getConnectionMap() {
+ return connectionMap;
+ }
}
\ No newline at end of file
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java?rev=1509495&r1=1509494&r2=1509495&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
Fri Aug 2 00:27:32 2013
@@ -29,16 +29,17 @@ import org.apache.axis2.transport.http.H
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.transport.amqp.*;
+import org.apache.synapse.transport.amqp.ha.AMQPTransportHABrokerEntry;
+import org.apache.synapse.transport.amqp.ha.AMQPTransportHAEntry;
+import org.apache.synapse.transport.amqp.ha.AMQPTransportReconnectHandler;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.UUID;
+import java.util.concurrent.*;
/**
* The polling task deploy for each services exposed on AMQP transport. This
task
@@ -207,6 +208,8 @@ public class AMQPTransportPollingTask {
private ScheduledFuture<?> pollingTaskFuture;
+ private AMQPTransportReconnectHandler haHandler;
+
public void setUseTx(boolean useTx) {
isUseTx = useTx;
}
@@ -383,6 +386,10 @@ public class AMQPTransportPollingTask {
this.responseConnectionFactory = responseConnectionFactory;
}
+ public void setHaHandler(AMQPTransportReconnectHandler haHandler) {
+ this.haHandler = haHandler;
+ }
+
/**
* Start the polling task for this service
*/
@@ -507,6 +514,26 @@ public class AMQPTransportPollingTask {
} catch (ShutdownSignalException e) {
log.error("Polling task for service '" + serviceName + "'
received a " +
"shutdown signal", e);
+ Semaphore available = new Semaphore(0, true);
+ String key = UUID.randomUUID().toString();
+ haHandler.getBlockedTasks().add(new AMQPTransportHAEntry(
+ available, key, connectionFactoryName));
+ try {
+ available.acquire();
+ } catch (InterruptedException ie) {
+ log.error("The blocking semaphore received an
interrupted", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+
+ AMQPTransportHABrokerEntry brokerEntry =
haHandler.getConnectionMap().get(key);
+ if (brokerEntry == null) {
+ log.error("No new connection factory were found for key '"
+ key + "'");
+ } else {
+ setChannel(brokerEntry.getChannel());
+ this.queueingConsumer = new QueueingConsumer(channel);
+ }
+
} catch (ConsumerCancelledException e) {
log.error("Polling task for service '" + serviceName + "'
received a " +
"cancellation signal");
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java?rev=1509495&r1=1509494&r2=1509495&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
Fri Aug 2 00:27:32 2013
@@ -19,6 +19,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.transport.amqp.*;
import
org.apache.synapse.transport.amqp.connectionfactory.AMQPTransportConnectionFactory;
+import org.apache.synapse.transport.amqp.ha.AMQPTransportReconnectHandler;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
@@ -36,7 +37,8 @@ public class AMQPTransportPollingTaskFac
AxisService service,
ScheduledExecutorService pool,
AMQPTransportEndpoint endpoint,
- AMQPTransportConnectionFactory connectionFactory) throws AxisFault
{
+ AMQPTransportConnectionFactory connectionFactory,
+ AMQPTransportReconnectHandler haHandler) throws AxisFault {
Map<String, String> svcParam =
AMQPTransportUtils.getServiceStringParameters(service.getParameters());
@@ -48,6 +50,7 @@ public class AMQPTransportPollingTaskFac
pt.setServiceName(service.getName());
pt.setEndpoint(endpoint);
pt.setPollingTaskScheduler(pool);
+ pt.setHaHandler(haHandler);
// set buffers to hold request/response messages for this task
pt.setBuffers(new AMQPTransportBuffers());
@@ -89,7 +92,6 @@ public class AMQPTransportPollingTaskFac
pt.setInternalExchange(isInternalExchange);
}
-
pt.setChannel(connectionFactory.getChannel());
pt.setConnectionFactoryName(connectionFactory.getName());