Hi Jean Baptiste and Tim ,

My response to your very interesting questions, below :

@Jean Baptiste
> You are using a network of brokers.
Yes exactly.

> Did you try decreaseNetworkPriority on the network connector ?

Nope I'll try that immediatly, we'll see in few days if it .

> By default, messages will be forwarded to another broker only if there's at least an active consumer.

> Don't you have slow consumers identified ?
I don't know, the only way to check that is to use JMX ?

> By the way, what's the broker URL in the connection factory ? failover ?
Yep, I'm using failover url with url of both brocker.  ( failover:(tcp://broker1:61616,tcp://broker2:61616) )

> Are you using rebalance client ?
Nope, my topology is static, I think I don't need to do that.

> No DLQ ?
What do you mean by this question, I have a DLQ, it's not the purpose here, I think ?.

> What's the prefetch on client side configuration ?
For the 200 000 messages a day queue, I set a prefetch size of 10
For the 10 000 messages a day queue, I set a prefetch size of 1

@Tim

> Do messages get stuck in both queues, or only one of them?

Message stuck only in one queue, the other one works very well.

To complete, in the second brocker, there's no networkConnectors part, cause, of the duplex=true.



Le 23/01/2020 à 13:42, Jean-Baptiste Onofré a écrit :




Regards
JB

On 23/01/2020 11:19, Jérôme Barotin wrote:
Hi,

I've got two AMQ brokers installed in cluster mode with KahaDB, in order
to have an HA configuration. I have configured 2 queues (no topic) :

  * one that exchange 200 000 messages by day, message group is enabled,
    time to process a message goes between 10 to 60 seconds, it's done
    by 4 job with 15 threads each.
  * one others deal with 10 000 messages by day and there no group, it's
    consumed by 4 job with 5 thread each.

Consumers are multithreaded with PooledConnectionFactory.

All is working well but some time there messages that stuck. My
consumers are paused and are doing nothing. I can see that, on the
ActiveMq monitoring GUI :

  * Number of pending message is higher than zero
  * All consumer, Dispatched queue is setted to zero

Message can stuck 1 or 2 hours and get unblocked by theirself without
explicit reason, if I restart brockers : it unblocks messages.

I checked AMQ documentation and lot of parameters and haven't found a
solution. That's why I'm posting here a description of my issue. Is
anybody have an idea of a something to change or something to monitor,
in order to help me ?

The AMQ configuration is the following :

<beans
   xmlns="http://www.springframework.org/schema/beans";
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
   xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
   http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd";>

     <!-- Allows us to use system properties as variables in this
configuration file -->
     <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

         <property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
         </property>
     </bean>

    <!-- Allows accessing the server log -->
     <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
           lazy-init="false" scope="singleton"
           init-method="start" destroy-method="stop">
     </bean>

     <!--
         The <broker> element is used to configure the ActiveMQ broker.
     -->
     <broker xmlns="http://activemq.apache.org/schema/core";
brokerName="energysoft" dataDirectory="${activemq.data}" useJmx="true">

         <networkConnectors>
           <networkConnector uri="static:(tcp://myOtherBrokerIP:61616)"
                             userName="mylogin"
                             password="mypassword"
                             prefetchSize="100"
                             duplex="true"
                             />
         </networkConnectors>

         <destinationPolicy>
             <policyMap>
               <policyEntries>
                     <policyEntry queue=">" maxPageSize="10000"
maxBrowsePageSize="0" enableAudit="false" >
                   <networkBridgeFilterFactory>
                     <conditionalNetworkBridgeFilterFactory
replayWhenNoConsumers="true"/>
                   </networkBridgeFilterFactory>
                   <messageGroupMapFactory>
                     <simpleMessageGroupMapFactory/>
                   </messageGroupMapFactory>
                 </policyEntry>
                 <policyEntry topic=">" >
                   <pendingMessageLimitStrategy>
                     <constantPendingMessageLimitStrategy limit="1000"/>
                   </pendingMessageLimitStrategy>
                 </policyEntry>
               </policyEntries>
             </policyMap>
         </destinationPolicy>

         <plugins>
           <simpleAuthenticationPlugin>
               <users>
                   <authenticationUser username="mylogin"
password="mypassword" groups="users,admins"/>
               </users>
           </simpleAuthenticationPlugin>
         </plugins>

         <managementContext>
             <managementContext createConnector="false"/>
         </managementContext>


         <persistenceAdapter>
             <kahaDB directory="${activemq.data}/kahadb"
journalMaxFileLength="32mb"/>
         </persistenceAdapter>

           <systemUsage>
             <systemUsage>
                 <memoryUsage>
                     <memoryUsage percentOfJvmHeap="70" />
                 </memoryUsage>
                 <storeUsage>
                     <storeUsage limit="100 gb"/>
                 </storeUsage>
                 <tempUsage>
                     <tempUsage limit="50 gb"/>
                 </tempUsage>
             </systemUsage>
         </systemUsage>

         <transportConnectors>
             <!-- DOS protection, limit concurrent connections to 1000
and frame size to 100MB -->
             <transportConnector name="openwire"
uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"
/>
             <transportConnector name="amqp"
uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

             <transportConnector name="stomp"
uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

             <transportConnector name="mqtt"
uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

             <transportConnector name="ws"
uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

             <!-- cluster port -->
             <!-- <transportConnector uri="tcp://localhost:62002"/> -->
         </transportConnectors>

         <!-- destroy the spring context on shutdown to stop jetty -->
         <shutdownHooks>
             <bean xmlns="http://www.springframework.org/schema/beans";
class="org.apache.activemq.hooks.SpringContextHook" />
         </shutdownHooks>
     </broker>

     <import resource="jetty.xml"/>

</beans>



Reply via email to