Hi, "I am having trouble with messages not being delivered after a reconnect. I am using ActiveMQ 5.7. My clients are connecting using the failover transport and ssl." - moodycl
I have posted this same problem here before but this time I think it is a little different. A few years back we encountered a problem that looked very similar to this one. This is the original post for that problem. http://activemq.2283324.n4.nabble.com/Consumers-fail-after-reconnect-td4671583.html <http://activemq.2283324.n4.nabble.com/Consumers-fail-after-reconnect-td4671583.html> I found the problem and had it resolved here: https://issues.apache.org/jira/i#browse/AMQ-4746 <https://issues.apache.org/jira/i#browse/AMQ-4746> Now I am seeing a similar problem again but I think it is being caused by something else. Here is the scenario: 1. I start the client and connect to Server 1. 2. I pull the network cable on Server 1. 3. The inactivity monitor detects the failure and my transport listener is notified. 4. The client then reconnects to Server 2. 5. The transport listener is informed that the connection has been resumed. 6. From this point on messages can be sent but not received. Inspecting the broker through JMX, there does not appear to be anything wrong. There is no increasing in flight counts or error logs. We recently added BouncyCastle as a security provider and now we are experiencing this problem. If we remove the call to add BouncyCastle as a security provider the problem goes away. Turning up debug on the client shows that when the failover occurs, the unconsumed message lists are not all being cleared. The broken log looks something like this: DEBUG | transport interrupted, dispatchers: 10 DEBUG | notified failover transport (unconnected) of pending interruption processing for: ID:XXXXXX DEBUG | ID:XXXXXX clearing unconsumed list (0) on transport interrupt // 1 DEBUG | ID:XXXXXX clearing unconsumed list (0) on transport interrupt // 2 ... DEBUG | ID:XXXXXX clearing unconsumed list (0) on transport interrupt // 7 // Some of the consumers are not cleared, the count down latch is never triggered // After the reconnect sending is working and receive broken The working log looks something like this: DEBUG | transport interrupted, dispatchers: 10 DEBUG | notified failover transport (unconnected) of pending interruption processing for: ID:XXXXXX DEBUG | ID:XXXXXX clearing unconsumed list (0) on transport interrupt // 1 DEBUG | ID:XXXXXX clearing unconsumed list (0) on transport interrupt // 2 ... DEBUG | ID:XXXXXX clearing unconsumed list (0) on transport interrupt // 10 // All of the consumers get cleared DEBUG | transportInterruptionProcessingComplete for: ID:XXXXXX DEBUG | notified failover transport () of interruption completion for: ID:XXXXXX // After the reconnect everything is working for both send and receive It looks like there may be a problem with the way the CountDownLatch in ActiveMQConnection is being triggered. I have been able to resolve the problem by adding some additional logic to the transportInterupted method. But I still haven't made the connection to the use of BouncyCastle. It may just be that the use of BouncyCastle has exposed a race condition that was always present. Any input or help would be appreciated. I am hesitant to add the logic that makes it work without knowing why this has just now become a problem. Here is the code I added that allows the failover to complete successfully. public void transportInterupted() { //It looks like the way this count is derived is not being guarded correctly this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0)); if (LOG.isDebugEnabled()) { LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount()); } signalInterruptionProcessingNeeded(); //This method will decrement the count for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { ActiveMQSession s = i.next(); s.clearMessagesInProgress(); } //And this method will decrement the count for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) { connectionConsumer.clearMessagesInProgress(); } //But these two lists seem unrelated to the way original count was derived //The disposal of the transport happens async and may result in sessions being removed??? for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { TransportListener listener = iter.next(); listener.transportInterupted(); } //[CLM] - This is the code that I added, it will wait for the latch // but if the latch is not triggered after 10 seconds, it will proceed with normal // failover operations try { LOG.debug("[CLM] Waiting for processing to complete..."); this.transportInterruptionProcessingComplete.await(10, TimeUnit.SECONDS); } catch (InterruptedException ex) { LOG.debug("[CLM] InterruptedException"); } finally { if (transportInterruptionProcessingComplete.getCount() > 0) { LOG.debug("[CLM] Processing did not complete successfully."); this.transportInterruptionProcessingComplete = null; FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); if (failoverTransport != null) { LOG.debug("[CLM] This is a FailoverTransport, so further processing is needed."); failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId()); LOG.debug("[CLM] Completed interrupt processing for FailoverTransport."); } } else { LOG.debug("[CLM] Processing should have completed successfully."); } } } Thanks, Calvin -- View this message in context: http://activemq.2283324.n4.nabble.com/Failover-Transport-Stops-Working-tp4698156.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.