All:
I have done quite a bit of reading - ActiveMQ documentation, the ActiveMQ In
Action book (MEAP), the ActiveMQ forum, as well as many other internet postings.
We are using ActiveMQ 5.3.1 in production and recently experienced a case where
849 messages were duplicated within a 70 second period.
My environment:
linux; JDK 1.6; ActiveMQ 5.3.1; Camel 2.4
My current configuration is master/slave shared nothing. Here's the master
config:
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration
file -->
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="emsp-primary"
dataDirectory="${activemq.base}/data"
destroyApplicationContextOnStop="true" >
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true"
memoryLimit="5mb" useCache="false">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true"
memoryLimit="5mb" useCache="false">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<networkConnectors>
</networkConnectors>
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"
indexWriteBatchSize="100"
journalMaxFileLength="33554432"
enableIndexWriteAsync="true"
enableJournalDiskSyncs="false" />
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="512 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="2 gb"
name="data-store"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
<import resource="jetty.xml"/>
</beans>
And the Failover config:
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration
file -->
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="emsp-failover"
dataDirectory="${activemq.base}/data"
destroyApplicationContextOnStop="true" >
<!--
For better performances use VM cursor and small memory
limit.
For more information, see:
http://activemq.apache.org/message-cursors.html
Also, if your producer is "hanging", it's probably due to producer
flow control.
For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true"
memoryLimit="5mb" useCache="false">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true"
memoryLimit="5mb" useCache="false">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed
in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!-- The store and forward broker networks ActiveMQ will listen
to -->
<networkConnectors>
<networkConnector name="bridge-to-primary"
uri="static:(tcp://10.9.8.7:61616)"
dynamicOnly="false"
duplex="true"
prefetchSize="1">
<excludedDestinations>
<queue
physicalName="Consumer.*.VirtualTopic.>"/>
</excludedDestinations>
<staticallyIncludedDestinations>
<topic
physicalName="VirtualTopic.MyCompany.SharedServices.DirectoryServices"/>
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"
indexWriteBatchSize="100"
journalMaxFileLength="33554432"
enableIndexWriteAsync="true"
enableJournalDiskSyncs="false" />
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="512 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="2 gb"
name="data-store"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
<import resource="jetty.xml"/>
</beans>
Finally, here are portions of the ActiveMQ log files:
Log Messages from Primary
2011-02-16 05:18:39,197 | WARN | Ignoring ack received before dispatch; result
of failover with an outstanding ack. Acked messages will be replayed if present
on this broker. Ignored ack: MessageAck {commandId = 1166, responseRequired =
false, ackType = 2, consumerId =
ID:prod-interop-mq2.mycompany.com-52485-1296046376681-0:4:3:1, firstMessageId =
ID:prod-interop-mq1.mycompany.com-48185-1296047391209-0:2:3:1540:1,
lastMessageId =
ID:prod-interop-mq1.mycompany.com-48185-1296047391209-0:2:3:1540:1, destination
= queue://MyCompany.CAM.Import, transactionId = null, messageCount = 1} |
org.apache.activemq.broker.region.PrefetchSubscription | ActiveMQ Transport:
tcp:///10.9.8.7:371512011-02-16 05:18:45,861 | INFO | Network connection
between vm://emsp-failover#0 and tcp:///10.9.8.7:61616 shutdown due to a local
error: org.apache.activemq.transport.InactivityIOException: Channel was
inactive for too long: /10.9.8.7:61616 |
org.apache.activemq.network.DemandForwardingBridge | InactivityMonitor Async
Task: java.util.concurrent.ThreadPoolExecutor$Worker@e757372011-02-16
05:18:45,905 | INFO | Connector vm://emsp-failover Stopped |
org.apache.activemq.broker.TransportConnector | InactivityMonitor Async Task:
java.util.concurrent.ThreadPoolExecutor$Worker@e757372011-02-16 05:18:45,905 |
INFO | emsp-failover bridge to emsp-primary stopped |
org.apache.activemq.network.DemandForwardingBridge | InactivityMonitor Async
Task: java.util.concurrent.ThreadPoolExecutor$Worker@e757372011-02-16
05:18:45,908 | INFO | Establishing network connection from vm://emsp-failover
to tcp://10.9.8.7:61616 | org.apache.activemq.network.DiscoveryNetworkConnector
| Simple Discovery Agent:
java.util.concurrent.ThreadPoolExecutor$Worker@504fe42011-02-16 05:18:45,911 |
INFO | Connector vm://emsp-failover Started |
org.apache.activemq.broker.TransportConnector | Simple Discovery Agent:
java.util.concurrent.ThreadPoolExecutor$Worker@504fe42011-02-16 05:19:30,738 |
INFO | Usage Manager Memory Limit reached on queue://MyCompany.CAM.Import.
Producers will be throttled to the rate at which messages are removed from this
destination to prevent flooding it. See
http://activemq.apache.org/producer-flow-control.html for more info |
org.apache.activemq.broker.region.Queue | ActiveMQ Transport:
tcp:///10.9.8.8:446342011-02-16 05:19:38,877 | INFO | Network connection
between vm://emsp-failover#6 and tcp:///10.9.8.7:61616(emsp-primary) has been
established. | org.apache.activemq.network.DemandForwardingBridge |
StartLocalBridge: localBroker=vm://emsp-failover#6
Log Messages from Failover
2011-02-16 05:19:30,361 | WARN | Caught an exception processing local command
| org.apache.activemq.network.DemandForwardingBridge | ActiveMQ Connection
Dispatcher:
vm://emsp-primary#6org.apache.activemq.transport.InactivityIOException: Channel
was inactive for too long: /10.9.8.8:49724 at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:235)
at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:702)
at
org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(DemandForwardingBridgeSupport.java:158)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
at
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:118) at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1205)
at
org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:790)
at
org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:826)
at
org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
at
org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)2011-02-16
05:19:30,334 | WARN | Network connection between vm://emsp-primary#6 and
tcp:///10.9.8.8:49724 shutdown due to a remote error: java.net.SocketException:
Broken pipe | org.apache.activemq.network.DemandForwardingBridge |
InactivityMonitor Async Task:
java.util.concurrent.ThreadPoolExecutor$Worker@9ec3322011-02-16 05:19:28,418 |
WARN | KahaDB PageFile flush: 6 queued writes, latch wait took 732 |
org.apache.kahadb.page.PageFile | ActiveMQ Journal Checkpoint Worker2011-02-16
05:19:27,385 | INFO | Slow KahaDB access: Journal append took: 411 ms, Index
update took 2368 ms | org.apache.activemq.store.kahadb.MessageDatabase |
ActiveMQ Transport: tcp:///10.9.8.7:608992011-02-16 05:19:30,915 | INFO |
Network connection between vm://emsp-primary#6 and tcp:///10.9.8.8:49724
shutdown due to a local error:
org.apache.activemq.transport.InactivityIOException: Channel was inactive for
too long: /10.9.8.8:49724 | org.apache.activemq.network.DemandForwardingBridge
| ActiveMQ Connection Dispatcher: vm://emsp-primary#62011-02-16 05:19:32,153 |
INFO | Slow KahaDB access: cleanup took 9200 |
org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint
Worker2011-02-16 05:19:33,470 | INFO | Slow KahaDB access: Journal append
took: 0 ms, Index update took 7002 ms |
org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Transport:
tcp:///10.9.8.8:477082011-02-16 05:19:33,887 | WARN | KahaDB PageFile flush: 4
queued writes, latch wait took 410 | org.apache.kahadb.page.PageFile | ActiveMQ
Journal Checkpoint Worker2011-02-16 05:19:38,048 | INFO | Connector
vm://emsp-primary Stopped | org.apache.activemq.broker.TransportConnector |
ActiveMQ Task2011-02-16 05:19:38,068 | INFO | emsp-primary bridge to
emsp-failover stopped | org.apache.activemq.network.DemandForwardingBridge |
ActiveMQ Task2011-02-16 05:19:38,048 | INFO | The connection to
'/10.9.8.8:49724' is taking a long time to shutdown. |
org.apache.activemq.broker.TransportConnection | NetworkBridge2011-02-16
05:19:52,288 | INFO | Connector vm://emsp-primary Started |
org.apache.activemq.broker.TransportConnector | ActiveMQ Transport:
tcp:///10.9.8.8:386262011-02-16 05:19:55,082 | INFO | Slow KahaDB access:
cleanup took 1839 | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ
Journal Checkpoint Worker2011-02-16 05:19:55,554 | INFO | Created Duplex
Bridge back to emsp-failover | org.apache.activemq.broker.TransportConnection |
ActiveMQ Transport: tcp:///10.9.8.8:386262011-02-16 05:19:55,641 | INFO |
Network connection between vm://emsp-primary#8 and
tcp:///10.9.8.8:38626(emsp-failover) has been established. |
org.apache.activemq.network.DemandForwardingBridge | StartLocalBridge:
localBroker=vm://emsp-primary#82011-02-16 05:19:59,225 | INFO | Slow KahaDB
access: cleanup took 610 | org.apache.activemq.store.kahadb.MessageDatabase |
ActiveMQ Journal Checkpoint Worker
I should point out that all clients connect using the failover transport with
randomize=false. In other words our second broker is only there to provide
failover not load-balancing.
The reliability and availability (i.e. failover) of ActiveMQ have been
outstanding. However, producing (and then processing) 849 duplicate messages is
unacceptable.
Is the (occasional) production of duplicate messages known and customary
behavior?Is there something in my configuration that can be changed to prevent
duplicate messages?
Would using a "shared database" strategy prevent this? ( because only one
broker can be active at a time; also good would be a much simpler config - i.e.
all brokers can use the same configuration file)
Any other suggestions/advice would be quite welcome!
TIA,
Mike L. (aka patzerbud)