Hi Tim,

Yes, I'm aware of this section in the documentation. The |replayWhenNoConsumers="true" is positionned|since several month, but this doesn't change anything.

I'll consider your advice of changing the networkTTL setting, I've got a default value of 1, an higher value will avoid fail back mistake.

Anyway, since last Thursday, I have set the decreaseNetworkConsumerPriority="true" in the NetworkConnector, and it works very well. I haven't observed message stuck since this change. If it still the case Friday this week, I'll update the "message stuck" part in the documentation.

We're staying in touch,

Jérôme

Le 25/01/2020 à 14:29, Tim Bain a écrit :

Have you read the Stuck Messages section of
https://activemq.apache.org/networks-of-brokers and applied one of the
solutions it describes (I recommend the replayWhenNoConsumers=true solution
because it works no matter how many consumers you have). You'll also need
to ensure that your TTL setting for the Network of Brokers (which is not
the same as the TTL on an individual message) is high enough to allow
messages to hop between the brokers several times in case the consumer
fails back and forth several times. I'd think that a value of 10 would be
more than sufficient.

Tim

On Thu, Jan 23, 2020, 7:29 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote:

My question about DLQ is if you see some messages moved in DLQ (because
they expired for instance).

Anyway, I would try first:
1. decrease network priority to consume locally and avoid use of the bridge
2. increase the prefetch on the bridge (the network connector)

Regards
JB

On 23/01/2020 14:52, Jérôme Barotin wrote:
Hi Jean Baptiste and Tim ,

My response to your very interesting questions, below :

@Jean Baptiste
You are using a network of brokers.
Yes exactly.

Did you try decreaseNetworkPriority on the network connector ?
Nope I'll try that immediatly, we'll see in few days if it .

By default, messages will be forwarded to another broker only if
there's at least an active consumer.

Don't you have slow consumers identified ?
I don't know, the only way to check that is to use JMX ?

By the way, what's the broker URL in the connection factory ? failover ?
Yep, I'm using failover url with url of both brocker.  (
failover:(tcp://broker1:61616,tcp://broker2:61616) )

Are you using rebalance client ?
Nope, my topology is static, I think I don't need to do that.

No DLQ ?
What do you mean by this question, I have a DLQ, it's not the purpose
here, I think ?.

What's the prefetch on client side configuration ?
For the 200 000 messages a day queue, I set a prefetch size of 10
For the 10 000 messages a day queue, I set a prefetch size of 1

@Tim

Do messages get stuck in both queues, or only one of them?
Message stuck only in one queue, the other one works very well.

To complete, in the second brocker, there's no networkConnectors part,
cause, of the duplex=true.



Le 23/01/2020 à 13:42, Jean-Baptiste Onofré a écrit :


Regards
JB

On 23/01/2020 11:19, Jérôme Barotin wrote:
Hi,

I've got two AMQ brokers installed in cluster mode with KahaDB, in
order
to have an HA configuration. I have configured 2 queues (no topic) :

   * one that exchange 200 000 messages by day, message group is
enabled,
     time to process a message goes between 10 to 60 seconds, it's done
     by 4 job with 15 threads each.
   * one others deal with 10 000 messages by day and there no group,
it's
     consumed by 4 job with 5 thread each.

Consumers are multithreaded with PooledConnectionFactory.

All is working well but some time there messages that stuck. My
consumers are paused and are doing nothing. I can see that, on the
ActiveMq monitoring GUI :

   * Number of pending message is higher than zero
   * All consumer, Dispatched queue is setted to zero

Message can stuck 1 or 2 hours and get unblocked by theirself without
explicit reason, if I restart brockers : it unblocks messages.

I checked AMQ documentation and lot of parameters and haven't found a
solution. That's why I'm posting here a description of my issue. Is
anybody have an idea of a something to change or something to monitor,
in order to help me ?

The AMQ configuration is the following :

<beans
    xmlns="http://www.springframework.org/schema/beans";
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
    xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
    http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd";>

      <!-- Allows us to use system properties as variables in this
configuration file -->
      <bean

class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

          <property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
          </property>
      </bean>

     <!-- Allows accessing the server log -->
      <bean id="logQuery"
class="io.fabric8.insight.log.log4j.Log4jLogQuery"
            lazy-init="false" scope="singleton"
            init-method="start" destroy-method="stop">
      </bean>

      <!--
          The <broker> element is used to configure the ActiveMQ broker.
      -->
      <broker xmlns="http://activemq.apache.org/schema/core";
brokerName="energysoft" dataDirectory="${activemq.data}" useJmx="true">

          <networkConnectors>
            <networkConnector uri="static:(tcp://myOtherBrokerIP:61616)"
                              userName="mylogin"
                              password="mypassword"
                              prefetchSize="100"
                              duplex="true"
                              />
          </networkConnectors>

          <destinationPolicy>
              <policyMap>
                <policyEntries>
                      <policyEntry queue=">" maxPageSize="10000"
maxBrowsePageSize="0" enableAudit="false" >
                    <networkBridgeFilterFactory>
                      <conditionalNetworkBridgeFilterFactory
replayWhenNoConsumers="true"/>
                    </networkBridgeFilterFactory>
                    <messageGroupMapFactory>
                      <simpleMessageGroupMapFactory/>
                    </messageGroupMapFactory>
                  </policyEntry>
                  <policyEntry topic=">" >
                    <pendingMessageLimitStrategy>
                      <constantPendingMessageLimitStrategy
limit="1000"/>
                    </pendingMessageLimitStrategy>
                  </policyEntry>
                </policyEntries>
              </policyMap>
          </destinationPolicy>

          <plugins>
            <simpleAuthenticationPlugin>
                <users>
                    <authenticationUser username="mylogin"
password="mypassword" groups="users,admins"/>
                </users>
            </simpleAuthenticationPlugin>
          </plugins>

          <managementContext>
              <managementContext createConnector="false"/>
          </managementContext>


          <persistenceAdapter>
              <kahaDB directory="${activemq.data}/kahadb"
journalMaxFileLength="32mb"/>
          </persistenceAdapter>

            <systemUsage>
              <systemUsage>
                  <memoryUsage>
                      <memoryUsage percentOfJvmHeap="70" />
                  </memoryUsage>
                  <storeUsage>
                      <storeUsage limit="100 gb"/>
                  </storeUsage>
                  <tempUsage>
                      <tempUsage limit="50 gb"/>
                  </tempUsage>
              </systemUsage>
          </systemUsage>

          <transportConnectors>
              <!-- DOS protection, limit concurrent connections to 1000
and frame size to 100MB -->
              <transportConnector name="openwire"
uri="tcp://
0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
"
/>
              <transportConnector name="amqp"
uri="amqp://
0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
"/>

              <transportConnector name="stomp"
uri="stomp://
0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
"/>

              <transportConnector name="mqtt"
uri="mqtt://
0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
"/>

              <transportConnector name="ws"
uri="ws://
0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
"/>

              <!-- cluster port -->
              <!-- <transportConnector uri="tcp://localhost:62002"/> -->
          </transportConnectors>

          <!-- destroy the spring context on shutdown to stop jetty -->
          <shutdownHooks>
              <bean xmlns="http://www.springframework.org/schema/beans";
class="org.apache.activemq.hooks.SpringContextHook" />
          </shutdownHooks>
      </broker>

      <import resource="jetty.xml"/>

</beans>


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Reply via email to