I'm sad to confirm this behaviour with the last changes:

1.-Start the remote broker.
2.-Start the activemq broker with a queued bridge.
3.-Send a message to the bridged queue: The message is bridged correctly.
4.-Stop the remote broker.
5.-Send a message to the bridged queue. It fails on QueueBridge.sendMessage as the producer is closed.
6.-Send a new message to the bridged queue. After the last changes, it tries to call 'restartProducer' but it fails, because the remote broker is down.
7.-Start the remote broker.
8.-Send a new message to the bridged queue. restartProducer is called again, the producer and its connection are successfully recreated. But ONLY the new message reaches the remote broker. I don't see any attempt to send the old ones. In the JMX console, I can see, for this queue:

ConsumerCount: 1
DequeueCount: 4
EnqueueCount: 4
QueueSize: 0

But only two messages were delivered  to the remote broker.


What could happen with those messages? Any idea about what classes to check?

Regards.



Manuel Teira escribió:
Rob Davies escribió:
Hi Manuel,

this looks like a good catch! Would mind opening a jira on this - just so it's easier to track - I'll look at this as soon as I can

cheers,

Thanks. Rob. I've been experimenting further, and have made some changes to allow a DestinationBridge to get its Connections changed. I've created the new abstract methods:

protected abstract void setConnectionForConsumer(Connection consumerConnection);

protected abstract void setConnectionForProducer(Connection producerConnection);

And implemented them into the subclasses QueueBridge and TopicBridge, just checking the casting to the right java.jms.Connection subclasses.

Also, I've changed the abstract method restartProducerConnection of JmsConnector to make it return the new connection and so, be able to inject it back into the DestinationBridge making use of the new setter methods.

Also a pair of changes in DestinationBridge onMessage method to manage correctly (I expect) the (now attribute) variable 'attempt'.

I don't know if this is the path to follow, but it seems to work fine. Anyway, I think that only *new messages* are being sent to the remote bridged broker, and no the ones that were first tried during the remote broker failure.


Regards.






Rob
On 27 Sep 2006, at 14:48, Manuel Teira wrote:

Hello.

Looking at the code in DestinationBridge (org.apache.activemq.network.jms), I see that when the deliver of a message to the remote broker fails, there's a counter implemented as the var 'attempt' that seems to be thought to mark fails and try to restart the producer.

But, shouldn't that variable be a member of the DestinationBridge class instead of a local variable of the onMessage member method? In this way, the var is always initialized to zero for every onMessage call. So, restartProducer is never called:

    public void onMessage(Message message) {
        if (started.get() && message != null) {
            int attempt = 0;
            try {
                if (attempt > 0) {
                    restartProducer();
                }
...

In my tests, I've tryed changing the var 'attempt' to be an object member. Now, restartProducer() is called but it seems that the new connection is not being used. Looking at the code, I don't understand how calling

jmsConnector.restartProducerConnection()


is really changing the environment of

createProducer()

in the QueueBridge subclass.

For example, for the QueueBridge subclass, createProducer is using the member producerConnection:

    protected MessageProducer createProducer() throws JMSException{
        producerSession=producerConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
        producer = producerSession.createSender(null);
        return producer;
    }

but I think that this is not related anymore with the JmsQueueConnector outboundQueueConnection, that is the only affected member in jmsConnector.restartProducerConnection(). I don't ever know how or who, in the initialization is setting up the QueueBridge producerConnection member, calling, I suppose, setProducerConnection.
I think that a solution to this should be to be able to change the producerConnection of QueueBridge when we are restarting the Producer. But for that, we should need to implement  restartProducer in the DestinationBridge subclasses.


Any hint or idea? I really need to have remote bridge reconnections working urgently, so please, if you need further info, make me know.

Regards.






Index: src/main/java/org/apache/activemq/network/jms/QueueBridge.java =================================================================== --- src/main/java/org/apache/activemq/network/jms/QueueBridge.java (revisi¢n: 450397) +++ src/main/java/org/apache/activemq/network/jms/QueueBridge.java (copia de trabajo) @@ -161,5 +161,17 @@ protected Connection getConnectionForProducer(){ return getProducerConnection(); } - + + protected void setConnectionForConsumer(Connection consumerConnection){ + if (consumerConnection instanceof QueueConnection) { + this.consumerConnection = (QueueConnection)consumerConnection; + } + } + + protected void setConnectionForProducer(Connection producerConnection){ + if (producerConnection instanceof QueueConnection) { + this.producerConnection = (QueueConnection)producerConnection; + } + } Index: src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java =================================================================== --- src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java (revisi¢n: 450397) +++ src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java (copia de trabajo) @@ -188,9 +188,11 @@ } - public void restartProducerConnection() throws NamingException, JMSException { + public Connection restartProducerConnection() throws NamingException, JMSException { outboundTopicConnection = null; initializeForeignTopicConnection(); + return outboundTopicConnection; } protected void initializeForeignTopicConnection() throws NamingException,JMSException{ Index: src/main/java/org/apache/activemq/network/jms/JmsConnector.java =================================================================== --- src/main/java/org/apache/activemq/network/jms/JmsConnector.java (revisi¢n: 450397) +++ src/main/java/org/apache/activemq/network/jms/JmsConnector.java (copia de trabajo) @@ -319,4 +319,5 @@ this.name = name; } - public abstract void restartProducerConnection() throws NamingException, JMSException; + public abstract Connection restartProducerConnection() throws NamingException, JMSException; Index: src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java =================================================================== --- src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java (revisi¢n: 450397) +++ src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java (copia de trabajo) @@ -186,9 +186,11 @@ this.outboundQueueConnectionFactory=foreignQueueConnectionFactory; } - public void restartProducerConnection() throws NamingException, JMSException { + public Connection restartProducerConnection() throws NamingException, JMSException { outboundQueueConnection = null; initializeForeignQueueConnection(); + return outboundQueueConnection; } protected void initializeForeignQueueConnection() throws NamingException,JMSException{ Index: src/main/java/org/apache/activemq/network/jms/TopicBridge.java =================================================================== --- src/main/java/org/apache/activemq/network/jms/TopicBridge.java (revisi¢n: 450397) +++ src/main/java/org/apache/activemq/network/jms/TopicBridge.java (copia de trabajo) @@ -186,3 +186,16 @@ protected Connection getConnectionForProducer(){ return getProducerConnection(); } + + protected void setConnectionForConsumer(Connection consumerConnection){ + if (consumerConnection instanceof TopicConnection) { + this.consumerConnection = (TopicConnection)consumerConnection; + } + } + + protected void setConnectionForProducer(Connection producerConnection){ + if (producerConnection instanceof TopicConnection) { + this.producerConnection = (TopicConnection)producerConnection; + } + } Index: src/main/java/org/apache/activemq/network/jms/DestinationBridge.java =================================================================== --- src/main/java/org/apache/activemq/network/jms/DestinationBridge.java (revisi¢n: 450397) +++ src/main/java/org/apache/activemq/network/jms/DestinationBridge.java (copia de trabajo) @@ -45,6 +45,7 @@ protected boolean doHandleReplyTo = true; protected JmsConnector jmsConnector; private int maximumRetries = 10; + private int attempt = 0; /** * @return Returns the consumer. @@ -112,7 +113,6 @@ public void onMessage(Message message) { if (started.get() && message != null) { - int attempt = 0; try { if (attempt > 0) { restartProducer(); @@ -132,6 +132,7 @@ converted = jmsMessageConvertor.convert(message); } sendMessage(converted); + attempt = 0; message.acknowledge(); } catch (Exception e) { @@ -173,6 +174,10 @@ protected abstract Connection getConnectionForProducer(); + protected abstract void setConnectionForConsumer(Connection consumerConnection); + + protected abstract void setConnectionForProducer(Connection producerConnection); + protected void restartProducer() throws JMSException, NamingException { try { getConnectionForProducer().close(); @@ -180,6 +185,7 @@ catch (Exception e) { log.debug("Ignoring failure to close producer connection: " + e, e); } - jmsConnector.restartProducerConnection(); + setConnectionForProducer(jmsConnector.restartProducerConnection()); createProducer(); }



Reply via email to