My question about DLQ is if you see some messages moved in DLQ (because
they expired for instance).

Anyway, I would try first:
1. decrease network priority to consume locally and avoid use of the bridge
2. increase the prefetch on the bridge (the network connector)

Regards
JB

On 23/01/2020 14:52, Jérôme Barotin wrote:
> Hi Jean Baptiste and Tim ,
> 
> My response to your very interesting questions, below :
> 
> @Jean Baptiste
>> You are using a network of brokers.
> Yes exactly.
> 
>> Did you try decreaseNetworkPriority on the network connector ?
> 
> Nope I'll try that immediatly, we'll see in few days if it .
> 
>> By default, messages will be forwarded to another broker only if
> there's at least an active consumer.
> 
>> Don't you have slow consumers identified ?
> I don't know, the only way to check that is to use JMX ?
> 
>> By the way, what's the broker URL in the connection factory ? failover ?
> Yep, I'm using failover url with url of both brocker.  (
> failover:(tcp://broker1:61616,tcp://broker2:61616) )
> 
>> Are you using rebalance client ?
> Nope, my topology is static, I think I don't need to do that.
> 
>> No DLQ ?
> What do you mean by this question, I have a DLQ, it's not the purpose
> here, I think ?.
> 
>> What's the prefetch on client side configuration ?
> For the 200 000 messages a day queue, I set a prefetch size of 10
> For the 10 000 messages a day queue, I set a prefetch size of 1
> 
> @Tim
> 
>> Do messages get stuck in both queues, or only one of them?
> 
> Message stuck only in one queue, the other one works very well.
> 
> To complete, in the second brocker, there's no networkConnectors part,
> cause, of the duplex=true.
> 
> 
> 
> Le 23/01/2020 à 13:42, Jean-Baptiste Onofré a écrit :
> 
> 
>>
> 
>> Regards
>> JB
>>
>> On 23/01/2020 11:19, Jérôme Barotin wrote:
>>> Hi,
>>>
>>> I've got two AMQ brokers installed in cluster mode with KahaDB, in order
>>> to have an HA configuration. I have configured 2 queues (no topic) :
>>>
>>>   * one that exchange 200 000 messages by day, message group is enabled,
>>>     time to process a message goes between 10 to 60 seconds, it's done
>>>     by 4 job with 15 threads each.
>>>   * one others deal with 10 000 messages by day and there no group, it's
>>>     consumed by 4 job with 5 thread each.
>>>
>>> Consumers are multithreaded with PooledConnectionFactory.
>>>
>>> All is working well but some time there messages that stuck. My
>>> consumers are paused and are doing nothing. I can see that, on the
>>> ActiveMq monitoring GUI :
>>>
>>>   * Number of pending message is higher than zero
>>>   * All consumer, Dispatched queue is setted to zero
>>>
>>> Message can stuck 1 or 2 hours and get unblocked by theirself without
>>> explicit reason, if I restart brockers : it unblocks messages.
>>>
>>> I checked AMQ documentation and lot of parameters and haven't found a
>>> solution. That's why I'm posting here a description of my issue. Is
>>> anybody have an idea of a something to change or something to monitor,
>>> in order to help me ?
>>>
>>> The AMQ configuration is the following :
>>>
>>> <beans
>>>    xmlns="http://www.springframework.org/schema/beans";
>>>    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>    xsi:schemaLocation="http://www.springframework.org/schema/beans
>>> http://www.springframework.org/schema/beans/spring-beans.xsd
>>>    http://activemq.apache.org/schema/core
>>> http://activemq.apache.org/schema/core/activemq-core.xsd";>
>>>
>>>      <!-- Allows us to use system properties as variables in this
>>> configuration file -->
>>>      <bean
>>> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>>>
>>>
>>>          <property name="locations">
>>> <value>file:${activemq.conf}/credentials.properties</value>
>>>          </property>
>>>      </bean>
>>>
>>>     <!-- Allows accessing the server log -->
>>>      <bean id="logQuery"
>>> class="io.fabric8.insight.log.log4j.Log4jLogQuery"
>>>            lazy-init="false" scope="singleton"
>>>            init-method="start" destroy-method="stop">
>>>      </bean>
>>>
>>>      <!--
>>>          The <broker> element is used to configure the ActiveMQ broker.
>>>      -->
>>>      <broker xmlns="http://activemq.apache.org/schema/core";
>>> brokerName="energysoft" dataDirectory="${activemq.data}" useJmx="true">
>>>
>>>          <networkConnectors>
>>>            <networkConnector uri="static:(tcp://myOtherBrokerIP:61616)"
>>>                              userName="mylogin"
>>>                              password="mypassword"
>>>                              prefetchSize="100"
>>>                              duplex="true"
>>>                              />
>>>          </networkConnectors>
>>>
>>>          <destinationPolicy>
>>>              <policyMap>
>>>                <policyEntries>
>>>                      <policyEntry queue=">" maxPageSize="10000"
>>> maxBrowsePageSize="0" enableAudit="false" >
>>>                    <networkBridgeFilterFactory>
>>>                      <conditionalNetworkBridgeFilterFactory
>>> replayWhenNoConsumers="true"/>
>>>                    </networkBridgeFilterFactory>
>>>                    <messageGroupMapFactory>
>>>                      <simpleMessageGroupMapFactory/>
>>>                    </messageGroupMapFactory>
>>>                  </policyEntry>
>>>                  <policyEntry topic=">" >
>>>                    <pendingMessageLimitStrategy>
>>>                      <constantPendingMessageLimitStrategy limit="1000"/>
>>>                    </pendingMessageLimitStrategy>
>>>                  </policyEntry>
>>>                </policyEntries>
>>>              </policyMap>
>>>          </destinationPolicy>
>>>
>>>          <plugins>
>>>            <simpleAuthenticationPlugin>
>>>                <users>
>>>                    <authenticationUser username="mylogin"
>>> password="mypassword" groups="users,admins"/>
>>>                </users>
>>>            </simpleAuthenticationPlugin>
>>>          </plugins>
>>>
>>>          <managementContext>
>>>              <managementContext createConnector="false"/>
>>>          </managementContext>
>>>
>>>
>>>          <persistenceAdapter>
>>>              <kahaDB directory="${activemq.data}/kahadb"
>>> journalMaxFileLength="32mb"/>
>>>          </persistenceAdapter>
>>>
>>>            <systemUsage>
>>>              <systemUsage>
>>>                  <memoryUsage>
>>>                      <memoryUsage percentOfJvmHeap="70" />
>>>                  </memoryUsage>
>>>                  <storeUsage>
>>>                      <storeUsage limit="100 gb"/>
>>>                  </storeUsage>
>>>                  <tempUsage>
>>>                      <tempUsage limit="50 gb"/>
>>>                  </tempUsage>
>>>              </systemUsage>
>>>          </systemUsage>
>>>
>>>          <transportConnectors>
>>>              <!-- DOS protection, limit concurrent connections to 1000
>>> and frame size to 100MB -->
>>>              <transportConnector name="openwire"
>>> uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"
>>>
>>> />
>>>              <transportConnector name="amqp"
>>> uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>>>
>>>
>>>              <transportConnector name="stomp"
>>> uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>>>
>>>
>>>              <transportConnector name="mqtt"
>>> uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>>>
>>>
>>>              <transportConnector name="ws"
>>> uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>>>
>>>
>>>              <!-- cluster port -->
>>>              <!-- <transportConnector uri="tcp://localhost:62002"/> -->
>>>          </transportConnectors>
>>>
>>>          <!-- destroy the spring context on shutdown to stop jetty -->
>>>          <shutdownHooks>
>>>              <bean xmlns="http://www.springframework.org/schema/beans";
>>> class="org.apache.activemq.hooks.SpringContextHook" />
>>>          </shutdownHooks>
>>>      </broker>
>>>
>>>      <import resource="jetty.xml"/>
>>>
>>> </beans>
>>>
>>>
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to