Rob Davies escribió:
Hi Manuel,
this looks like a good catch! Would mind opening a jira on this -
just so it's easier to track - I'll look at this as soon as I can
cheers,
Thanks. Rob. I've been experimenting further, and have made some
changes to allow a DestinationBridge to get its Connections
changed. I've created the new abstract methods:
protected abstract void setConnectionForConsumer(Connection
consumerConnection);
protected abstract void setConnectionForProducer(Connection
producerConnection);
And implemented them into the subclasses QueueBridge and
TopicBridge, just checking the casting to the right
java.jms.Connection subclasses.
Also, I've changed the abstract method restartProducerConnection
of JmsConnector to make it return the new connection and so, be
able to inject it back into the DestinationBridge making use of
the new setter methods.
Also a pair of changes in DestinationBridge onMessage method to
manage correctly (I expect) the (now attribute) variable 'attempt'.
I don't know if this is the path to follow, but it seems to work
fine. Anyway, I think that only *new messages* are being sent to
the remote bridged broker, and no the ones that were first tried
during the remote broker failure.
Regards.
Rob
On 27 Sep 2006, at 14:48, Manuel Teira wrote:
Hello.
Looking at the code in DestinationBridge
(org.apache.activemq.network.jms), I see that when the deliver
of a message to the remote broker fails, there's a counter
implemented as the var 'attempt' that seems to be thought to
mark fails and try to restart the producer.
But, shouldn't that variable be a member of the
DestinationBridge class instead of a local variable of the
onMessage member method? In this way, the var is always
initialized to zero for every onMessage call. So,
restartProducer is never called:
public void onMessage(Message message) {
if (started.get() && message != null) {
int attempt = 0;
try {
if (attempt > 0) {
restartProducer();
}
...
In my tests, I've tryed changing the var 'attempt' to be an
object member. Now, restartProducer() is called but it seems
that the new connection is not being used. Looking at the code,
I don't understand how calling
jmsConnector.restartProducerConnection()
is really changing the environment of
createProducer()
in the QueueBridge subclass.
For example, for the QueueBridge subclass, createProducer is
using the member producerConnection:
protected MessageProducer createProducer() throws JMSException{
producerSession=producerConnection.createQueueSession
(false,Session.AUTO_ACKNOWLEDGE);
producer = producerSession.createSender(null);
return producer;
}
but I think that this is not related anymore with the
JmsQueueConnector outboundQueueConnection, that is the only
affected member in jmsConnector.restartProducerConnection(). I
don't ever know how or who, in the initialization is setting up
the QueueBridge producerConnection member, calling, I suppose,
setProducerConnection.
I think that a solution to this should be to be able to change
the producerConnection of QueueBridge when we are restarting the
Producer. But for that, we should need to implement
restartProducer in the DestinationBridge subclasses.
Any hint or idea? I really need to have remote bridge
reconnections working urgently, so please, if you need further
info, make me know.
Regards.
Index: src/main/java/org/apache/activemq/network/jms/QueueBridge.java
===================================================================
--- src/main/java/org/apache/activemq/network/jms/QueueBridge.java
(revisi¢n: 450397)
+++ src/main/java/org/apache/activemq/network/jms/QueueBridge.java
(copia de trabajo)
@@ -161,5 +161,17 @@
protected Connection getConnectionForProducer(){
return getProducerConnection();
}
-
+
+ protected void setConnectionForConsumer(Connection
consumerConnection){
+ if (consumerConnection instanceof QueueConnection) {
+ this.consumerConnection = (QueueConnection)
consumerConnection;
+ }
+ }
+
+ protected void setConnectionForProducer(Connection
producerConnection){
+ if (producerConnection instanceof QueueConnection) {
+ this.producerConnection = (QueueConnection)
producerConnection;
+ }
+ }
Index: src/main/java/org/apache/activemq/network/jms/
JmsTopicConnector.java
===================================================================
--- src/main/java/org/apache/activemq/network/jms/
JmsTopicConnector.java (revisi¢n: 450397)
+++ src/main/java/org/apache/activemq/network/jms/
JmsTopicConnector.java (copia de trabajo)
@@ -188,9 +188,11 @@
}
- public void restartProducerConnection() throws
NamingException, JMSException {
+ public Connection restartProducerConnection() throws
NamingException, JMSException {
outboundTopicConnection = null;
initializeForeignTopicConnection();
+ return outboundTopicConnection;
}
protected void initializeForeignTopicConnection() throws
NamingException,JMSException{
Index: src/main/java/org/apache/activemq/network/jms/
JmsConnector.java
===================================================================
--- src/main/java/org/apache/activemq/network/jms/
JmsConnector.java (revisi¢n: 450397)
+++ src/main/java/org/apache/activemq/network/jms/
JmsConnector.java (copia de trabajo)
@@ -319,4 +319,5 @@
this.name = name;
}
- public abstract void restartProducerConnection() throws
NamingException, JMSException;
+ public abstract Connection restartProducerConnection() throws
NamingException, JMSException;
Index: src/main/java/org/apache/activemq/network/jms/
JmsQueueConnector.java
===================================================================
--- src/main/java/org/apache/activemq/network/jms/
JmsQueueConnector.java (revisi¢n: 450397)
+++ src/main/java/org/apache/activemq/network/jms/
JmsQueueConnector.java (copia de trabajo)
@@ -186,9 +186,11 @@
this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
}
- public void restartProducerConnection() throws
NamingException, JMSException {
+ public Connection restartProducerConnection() throws
NamingException, JMSException {
outboundQueueConnection = null;
initializeForeignQueueConnection();
+ return outboundQueueConnection;
}
protected void initializeForeignQueueConnection() throws
NamingException,JMSException{
Index: src/main/java/org/apache/activemq/network/jms/TopicBridge.java
===================================================================
--- src/main/java/org/apache/activemq/network/jms/TopicBridge.java
(revisi¢n: 450397)
+++ src/main/java/org/apache/activemq/network/jms/TopicBridge.java
(copia de trabajo)
@@ -186,3 +186,16 @@
protected Connection getConnectionForProducer(){
return getProducerConnection();
}
+
+ protected void setConnectionForConsumer(Connection
consumerConnection){
+ if (consumerConnection instanceof TopicConnection) {
+ this.consumerConnection = (TopicConnection)
consumerConnection;
+ }
+ }
+
+ protected void setConnectionForProducer(Connection
producerConnection){
+ if (producerConnection instanceof TopicConnection) {
+ this.producerConnection = (TopicConnection)
producerConnection;
+ }
+ }
Index: src/main/java/org/apache/activemq/network/jms/
DestinationBridge.java
===================================================================
--- src/main/java/org/apache/activemq/network/jms/
DestinationBridge.java (revisi¢n: 450397)
+++ src/main/java/org/apache/activemq/network/jms/
DestinationBridge.java (copia de trabajo)
@@ -45,6 +45,7 @@
protected boolean doHandleReplyTo = true;
protected JmsConnector jmsConnector;
private int maximumRetries = 10;
+ private int attempt = 0;
/**
* @return Returns the consumer.
@@ -112,7 +113,6 @@
public void onMessage(Message message) {
if (started.get() && message != null) {
- int attempt = 0;
try {
if (attempt > 0) {
restartProducer();
@@ -132,6 +132,7 @@
converted = jmsMessageConvertor.convert
(message);
}
sendMessage(converted);
+ attempt = 0;
message.acknowledge();
}
catch (Exception e) {
@@ -173,6 +174,10 @@
protected abstract Connection getConnectionForProducer();
+ protected abstract void setConnectionForConsumer(Connection
consumerConnection);
+
+ protected abstract void setConnectionForProducer(Connection
producerConnection);
+
protected void restartProducer() throws JMSException,
NamingException {
try {
getConnectionForProducer().close();
@@ -180,6 +185,7 @@
catch (Exception e) {
log.debug("Ignoring failure to close producer
connection: " + e, e);
}
- jmsConnector.restartProducerConnection();
+ setConnectionForProducer
(jmsConnector.restartProducerConnection());
createProducer();
}