Hello,

Looking into the code I think this could be a bug:


I can see, that AMQPMessage.getAddress() can return null:


   @Override
   public String getAddress() {
      if (address == null) {
         Properties properties = getProtonMessage().getProperties();
         if (properties != null) {
            return properties.getTo();
         } else {
          *  return null;*
         }
      } else {
         return address;
      }
}


But ServerSessionImpl.send() does not check for null, but it should. This
is because SimpleString.toSimpleString() can return null if the input is
null:


    *  SimpleString address = message.getAddressSimpleString();*

      if (defaultAddress == null && address != null) {
         defaultAddress = address;
      }




*   if (address == null) {         // We don't want to force a re-encode
when the message gets sent to the consumer
message.setAddress(defaultAddress);      }*



SimpleString:

   public static SimpleString toSimpleString(final String string) {


*  if (string == null) {         return null;      }*
      return new SimpleString(string);
}


What do you think?

Kind Regards,
Andreas

On Thu, Dec 7, 2017 at 10:06 AM, andi welchlin <andi.welch...@gmail.com>
wrote:

> Hello all,
>
> I built active/active cluster with two brokers for my first tests. I can
> see in the log that the two brokers built a cluster. After startup
> everything looks fine.
>
> I configured one queue and sent a message using AMQP 1.0 to broker1 and
> tried to read it from broker2.
>
> But broker2 throws an exception and the message is not sent to the client:
>
> 09:34:53,146 ERROR [org.apache.activemq.artemis.core.server] AMQ224016:
> Caught exception: java.lang.NullPointerException
>         at org.apache.activemq.artemis.protocol.amqp.broker.
> AMQPMessage.setAddress(AMQPMessage.java:612) [artemis-amqp-protocol-2.4.0.
> jar:]
>         at org.apache.activemq.artemis.protocol.amqp.broker.
> AMQPMessage.setAddress(AMQPMessage.java:63) [artemis-amqp-protocol-2.4.0.
> jar:]
>         at org.apache.activemq.artemis.core.server.impl.
> ServerSessionImpl.send(ServerSessionImpl.java:1368)
> [artemis-server-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.core.server.impl.
> ServerSessionImpl.send(ServerSessionImpl.java:1311)
> [artemis-server-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.core.server.impl.
> ServerSessionImpl.send(ServerSessionImpl.java:1304)
> [artemis-server-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.core.protocol.core.
> ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690)
> [artemis-server-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.core.protocol.core.
> ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290)
> [artemis-server-2.4.0.jar:2.4.0]
>         at 
> org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33)
> [artemis-commons-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.utils.actors.ProcessorBase$
> ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.utils.actors.OrderedExecutor.
> doTask(OrderedExecutor.java:42) [artemis-commons-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.utils.actors.OrderedExecutor.
> doTask(OrderedExecutor.java:31) [artemis-commons-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.utils.actors.ProcessorBase$
> ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.4.0.jar:2.4.0]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [rt.jar:1.8.0_151]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [rt.jar:1.8.0_151]
>         at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
>
>
> The queue is not created on the fly but configured in the brokers, both
> brokers have this config:
>
>   <queues>
>        <queue name="awe.test.queue">
>           <address>awe.test.queue</address>
>           <durable>true</durable>
>         </queue>
>   </queues>
>
>
> The cluster configuration is pretty much copied from the examples:
>
>       <!-- Clustering configuration -->
>       <broadcast-groups>
>          <broadcast-group name="my-broadcast-group">
>             <group-address>${udp-address:231.7.7.7}</group-address>
>             <group-port>9876</group-port>
>             <broadcast-period>100</broadcast-period>
>             <connector-ref>netty-connector</connector-ref>
>          </broadcast-group>
>       </broadcast-groups>
>
>       <discovery-groups>
>          <discovery-group name="my-discovery-group">
>             <group-address>${udp-address:231.7.7.7}</group-address>
>             <group-port>9876</group-port>
>             <refresh-timeout>10000</refresh-timeout>
>          </discovery-group>
>       </discovery-groups>
>
>       <cluster-connections>
>          <cluster-connection name="my-cluster">
>             <connector-ref>netty-connector</connector-ref>
>             <retry-interval>500</retry-interval>
>             <use-duplicate-detection>true</use-duplicate-detection>
>             <message-load-balancing>STRICT</message-load-balancing>
>             <max-hops>1</max-hops>
>             <discovery-group-ref discovery-group-name="my-
> discovery-group"/>
>          </cluster-connection>
>       </cluster-connections>
>
>       <!-- a colocated server that will allow shared store full backups to
> be requested-->
>       <ha-policy>
>          <shared-store>
>             <colocated>
>                <backup-port-offset>100</backup-port-offset>
>                <backup-request-retries>-1</backup-request-retries>
>                <backup-request-retry-interval>2000</backup-request-
> retry-interval>
>                <max-backups>1</max-backups>
>                <request-backup>true</request-backup>
>                <master>
>                   <failover-on-shutdown>true</failover-on-shutdown>
>                </master>
>                <slave>
>                   <failover-on-shutdown>true</failover-on-shutdown>
>                </slave>
>             </colocated>
>          </shared-store>
>       </ha-policy>
>
>
> I am using qpid-send and qpid-receive as test clients. So when the client
> sends it is done like:
>
> qpid-send -b localhost:9800 -a awe.test.queue 
> '--connection-option={protocol:amqp1.0}'
> --content-string 'test message Do 7. Dez 09:31:51 CET 2017' --durable=yes
>
>
> And there is a client which reads using:
>
> qpid-receive -b localhost:9802 -a awe.test.queue 
> '--connection-option={protocol:amqp1.0}'
> -t --timeout 500
>
>
> I think I misconfigured something but I can not imagine what it could be.
>
> Btw. what protocol is used between the two brokers?
>
> I activated only the protocols AMQP and CORE.
>
> Any ideas?
>
> Kind Regards,
> Andreas
>
>

Reply via email to