gemmellr commented on code in PR #4817:
URL: https://github.com/apache/activemq-artemis/pull/4817#discussion_r1486082922


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java:
##########
@@ -294,14 +315,232 @@ protected boolean interceptLinkClosedEvent(Link link) {
       return false;
    }
 
+   private void asyncCreateTargetEventsSender(AMQPFederationCommandDispatcher 
commandLink) {
+      // If no remote policies configured then we don't need an events sender 
link
+      // currently, if some other use is added for this link this code must be
+      // removed and tests updated to expect this link to always be created.
+      if (remoteAddressMatchPolicies.isEmpty() && 
remoteQueueMatchPolicies.isEmpty()) {
+         return;
+      }
+
+      // Schedule the outgoing event link creation on the connection event 
loop thread.
+      //
+      // Eventual establishment of the outgoing events link or refusal informs 
this side
+      // of the connection as to whether the remote side supports receiving 
events for
+      // resources that it attempted to federate but they did not exist at the 
time and
+      // were subsequently added or for resources that might have been later 
removed via
+      // management and then subsequently re-added.
+      //
+      // Once the outcome of the event link is known then send any remote 
address or queue
+      // federation policies so that the remote can start federation of local 
addresses or
+      // queues to itself. This ordering prevents and race on creation of the 
events link
+      // and any federation consumer creation from the remote.
+      connection.runLater(() -> {
+         if (!isStarted()) {
+            return;
+         }
+
+         try {
+            final Sender sender = session.getSession().sender(
+               "Federation-events-sender:" + getName() + ":" + 
UUIDGenerator.getInstance().generateStringUUID());
+            final Target target = new Target();
+            final Source source = new Source();
+
+            target.setDynamic(true);
+            target.setCapabilities(new Symbol[] 
{Symbol.valueOf("temporary-topic")});

Review Comment:
   AmqpSupport.TEMP_TOPIC_CAPABILITY



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java:
##########
@@ -294,14 +315,232 @@ protected boolean interceptLinkClosedEvent(Link link) {
       return false;
    }
 
+   private void asyncCreateTargetEventsSender(AMQPFederationCommandDispatcher 
commandLink) {
+      // If no remote policies configured then we don't need an events sender 
link
+      // currently, if some other use is added for this link this code must be
+      // removed and tests updated to expect this link to always be created.
+      if (remoteAddressMatchPolicies.isEmpty() && 
remoteQueueMatchPolicies.isEmpty()) {
+         return;
+      }
+
+      // Schedule the outgoing event link creation on the connection event 
loop thread.
+      //
+      // Eventual establishment of the outgoing events link or refusal informs 
this side
+      // of the connection as to whether the remote side supports receiving 
events for
+      // resources that it attempted to federate but they did not exist at the 
time and
+      // were subsequently added or for resources that might have been later 
removed via
+      // management and then subsequently re-added.
+      //
+      // Once the outcome of the event link is known then send any remote 
address or queue
+      // federation policies so that the remote can start federation of local 
addresses or
+      // queues to itself. This ordering prevents and race on creation of the 
events link
+      // and any federation consumer creation from the remote.
+      connection.runLater(() -> {
+         if (!isStarted()) {
+            return;
+         }
+
+         try {
+            final Sender sender = session.getSession().sender(
+               "Federation-events-sender:" + getName() + ":" + 
UUIDGenerator.getInstance().generateStringUUID());
+            final Target target = new Target();
+            final Source source = new Source();
+
+            target.setDynamic(true);
+            target.setCapabilities(new Symbol[] 
{Symbol.valueOf("temporary-topic")});
+            target.setDurable(TerminusDurability.NONE);
+            target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+            // Set the dynamic node lifetime-policy to indicate this needs to 
be destroyed on close
+            // we don't want event links nodes remaining once a federation 
connection is closed.
+            final Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
+            dynamicNodeProperties.put(AmqpSupport.LIFETIME_POLICY, 
DeleteOnClose.getInstance());
+            target.setDynamicNodeProperties(dynamicNodeProperties);
+
+            sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+            sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+            sender.setDesiredCapabilities(EVENT_LINK_CAPABILITIES);
+            sender.setTarget(target);
+            sender.setSource(source);
+            sender.open();
+
+            final ScheduledFuture<?> futureTimeout;
+            final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+            if (brokerConnection.getConnectionTimeout() > 0) {
+               futureTimeout = 
brokerConnection.getServer().getScheduledPool().schedule(() -> {
+                  cancelled.set(true);
+                  
brokerConnection.connectError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout());
+               }, brokerConnection.getConnectionTimeout(), 
TimeUnit.MILLISECONDS);
+            } else {
+               futureTimeout = null;
+            }
+
+            // Using attachments to set up a Runnable that will be executed 
inside the remote link opened handler
+            sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, 
Runnable.class, () -> {
+               try {
+                  if (cancelled.get()) {
+                     return;
+                  }
+
+                  if (futureTimeout != null) {
+                     futureTimeout.cancel(false);
+                  }
+
+                  if (sender.getRemoteTarget() == null || 
!AmqpSupport.verifyOfferedCapabilities(sender)) {
+                     // Sender rejected or not an event link endpoint so close 
as we will
+                     // not support sending events to the remote but otherwise 
will operate
+                     // as normal.
+                     sender.close();
+                  } else {
+                     session.addFederationEventDispatcher(sender);
+                  }
+
+                  // Once we know whether the events support is active or not 
we can send
+                  // the remote federation policies and allow the remote 
federation links
+                  // to start forming.
+
+                  remoteQueueMatchPolicies.forEach((key, policy) -> {
+                     try {
+                        commandLink.sendPolicy(policy);
+                     } catch (Exception e) {
+                        brokerConnection.error(e);
+                     }
+                  });
+
+                  remoteAddressMatchPolicies.forEach((key, policy) -> {
+                     try {
+                        commandLink.sendPolicy(policy);
+                     } catch (Exception e) {
+                        brokerConnection.error(e);
+                     }
+                  });
+
+               } catch (Exception e) {
+                  brokerConnection.error(e);
+               }
+            });
+         } catch (Exception e) {
+            brokerConnection.error(e);
+         }
+
+         connection.flush();
+      });
+   }
+
+   private void asnycCreateTargetEventsReceiver() {
+      // If no local policies configured then we don't need an events receiver 
link
+      // currently, if some other use is added for this link this code must be
+      // removed and tests updated to expect this link to always be created.
+      if (addressMatchPolicies.isEmpty() && queueMatchPolicies.isEmpty()) {
+         return;
+      }
+
+      // Schedule the incoming event link creation on the connection event 
loop thread.
+      //
+      // Eventual establishment of the incoming event link or refusal informs 
this side
+      // of the connection as to whether the remote will send events for 
addresses or
+      // queues that were not present when a federation consumer attempt had 
failed and
+      // were later added or an existing federation consumer was closed due to 
management
+      // action and those resource are once again available for federation.
+      //
+      // Once the outcome of the event link is known then start all the policy 
managers
+      // which will start federation from remote addresses and queues to this 
broker.
+      // This ordering prevents any races around the events receiver creation 
and creation
+      // of federation consumers on the remote.
+      connection.runLater(() -> {
+         if (!isStarted()) {
+            return;
+         }
+
+         try {
+            final Receiver receiver = session.getSession().receiver(
+               "Federation-events-receiver:" + getName() + ":" + 
UUIDGenerator.getInstance().generateStringUUID());
+
+            final Target target = new Target();
+            final Source source = new Source();
+
+            source.setDynamic(true);
+            source.setCapabilities(new Symbol[] 
{Symbol.valueOf("temporary-topic")});

Review Comment:
   AmqpSupport.TEMP_TOPIC_CAPABILITY



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventSupport.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.federation;
+
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.EVENT_TYPE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_ADDRESS_ADDED;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_ADDRESS_NAME;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_QUEUE_ADDED;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_QUEUE_NAME;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
+/**
+ * Tools used for sending and receiving events inside AMQP message instance.
+ */
+public final class AMQPFederationEventSupport {
+
+   /**
+    * Encode an event that indicates that a Queue that belongs to a federation
+    * request which was not present at the time of the request or was later 
removed
+    * is now present and the remote should check for demand and attempt to 
federate
+    * the resource once again.
+    *
+    * @param address
+    *    The address that the queue is currently bound to.
+    * @param queue
+    *    The queue that was part of a previous federation request.
+    *
+    * @return the AMQP message with the encoded event data.
+    */
+   public static AMQPMessage encodeQueueAddedEvent(String address, String 
queue) {
+      final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+      final MessageAnnotations messageAnnotations = new 
MessageAnnotations(annotations);
+      final Map<String, Object> eventMap = new LinkedHashMap<>();
+      final Section sectionBody = new AmqpValue(eventMap);
+      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      annotations.put(EVENT_TYPE, REQUESTED_QUEUE_ADDED);
+
+      eventMap.put(REQUESTED_ADDRESS_NAME, address);
+      eventMap.put(REQUESTED_QUEUE_NAME, queue);
+
+      try {
+         final EncoderImpl encoder = TLSEncode.getEncoder();
+         encoder.setByteBuffer(new NettyWritable(buffer));
+         encoder.writeObject(messageAnnotations);
+         encoder.writeObject(sectionBody);
+
+         final byte[] data = new byte[buffer.writerIndex()];
+         buffer.readBytes(data);
+
+         return new AMQPStandardMessage(0, data, null);
+      } finally {
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   /**
+    * Encode an event that indicates that an Address that belongs to a 
federation
+    * request which was not present at the time of the request or was later 
removed
+    * is now present and the remote should check for demand and attempt to 
federate
+    * the resource once again.
+    *
+    * @param address
+    *    The address portion of the previously failed federation request
+    *
+    * @return the AMQP message with the encoded event data.
+    */
+   public static AMQPMessage encodeAddressAddedEvent(String address) {
+      final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+      final MessageAnnotations messageAnnotations = new 
MessageAnnotations(annotations);
+      final Map<String, Object> eventMap = new LinkedHashMap<>();
+      final Section sectionBody = new AmqpValue(eventMap);
+      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      annotations.put(EVENT_TYPE, REQUESTED_ADDRESS_ADDED);
+
+      eventMap.put(REQUESTED_ADDRESS_NAME, address);
+
+      try {
+         final EncoderImpl encoder = TLSEncode.getEncoder();
+         encoder.setByteBuffer(new NettyWritable(buffer));
+         encoder.writeObject(messageAnnotations);
+         encoder.writeObject(sectionBody);
+
+         final byte[] data = new byte[buffer.writerIndex()];
+         buffer.readBytes(data);
+
+         return new AMQPStandardMessage(0, data, null);
+      } finally {
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   /**
+    * Decode and return the Map containing the event data for a Queue that was
+    * the target of a previous federation request which was not present on the
+    * remote server or was later removed has now been (re)added.
+    *
+    * @param message
+    *    The event message that carries the event data in its body.
+    *
+    * @return a {@link Map} containing the payload of the incoming event.
+    *
+    * @throws ActiveMQException if an error occurs while decoding the event 
data.
+    */
+   @SuppressWarnings("unchecked")
+   public static Map<String, Object> decodeQueueAddedEvent(AMQPMessage 
message) throws ActiveMQException {
+      final Section body = message.getBody();
+
+      if (!(body instanceof AmqpValue)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body was not an AmqpValue type");
+      }
+
+      final AmqpValue bodyValue = (AmqpValue) body;
+
+      if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof 
Map)) {

Review Comment:
   superfluous null check



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java:
##########
@@ -294,14 +315,232 @@ protected boolean interceptLinkClosedEvent(Link link) {
       return false;
    }
 
+   private void asyncCreateTargetEventsSender(AMQPFederationCommandDispatcher 
commandLink) {
+      // If no remote policies configured then we don't need an events sender 
link
+      // currently, if some other use is added for this link this code must be
+      // removed and tests updated to expect this link to always be created.
+      if (remoteAddressMatchPolicies.isEmpty() && 
remoteQueueMatchPolicies.isEmpty()) {
+         return;
+      }
+
+      // Schedule the outgoing event link creation on the connection event 
loop thread.
+      //
+      // Eventual establishment of the outgoing events link or refusal informs 
this side
+      // of the connection as to whether the remote side supports receiving 
events for
+      // resources that it attempted to federate but they did not exist at the 
time and
+      // were subsequently added or for resources that might have been later 
removed via
+      // management and then subsequently re-added.
+      //
+      // Once the outcome of the event link is known then send any remote 
address or queue
+      // federation policies so that the remote can start federation of local 
addresses or
+      // queues to itself. This ordering prevents and race on creation of the 
events link

Review Comment:
   prevents and race -> prevents a race ?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventSupport.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.federation;
+
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.EVENT_TYPE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_ADDRESS_ADDED;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_ADDRESS_NAME;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_QUEUE_ADDED;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_QUEUE_NAME;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
+/**
+ * Tools used for sending and receiving events inside AMQP message instance.
+ */
+public final class AMQPFederationEventSupport {
+
+   /**
+    * Encode an event that indicates that a Queue that belongs to a federation
+    * request which was not present at the time of the request or was later 
removed
+    * is now present and the remote should check for demand and attempt to 
federate
+    * the resource once again.
+    *
+    * @param address
+    *    The address that the queue is currently bound to.
+    * @param queue
+    *    The queue that was part of a previous federation request.
+    *
+    * @return the AMQP message with the encoded event data.
+    */
+   public static AMQPMessage encodeQueueAddedEvent(String address, String 
queue) {
+      final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+      final MessageAnnotations messageAnnotations = new 
MessageAnnotations(annotations);
+      final Map<String, Object> eventMap = new LinkedHashMap<>();
+      final Section sectionBody = new AmqpValue(eventMap);
+      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      annotations.put(EVENT_TYPE, REQUESTED_QUEUE_ADDED);
+
+      eventMap.put(REQUESTED_ADDRESS_NAME, address);
+      eventMap.put(REQUESTED_QUEUE_NAME, queue);
+
+      try {
+         final EncoderImpl encoder = TLSEncode.getEncoder();
+         encoder.setByteBuffer(new NettyWritable(buffer));
+         encoder.writeObject(messageAnnotations);
+         encoder.writeObject(sectionBody);
+
+         final byte[] data = new byte[buffer.writerIndex()];
+         buffer.readBytes(data);
+
+         return new AMQPStandardMessage(0, data, null);
+      } finally {
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   /**
+    * Encode an event that indicates that an Address that belongs to a 
federation
+    * request which was not present at the time of the request or was later 
removed
+    * is now present and the remote should check for demand and attempt to 
federate
+    * the resource once again.
+    *
+    * @param address
+    *    The address portion of the previously failed federation request
+    *
+    * @return the AMQP message with the encoded event data.
+    */
+   public static AMQPMessage encodeAddressAddedEvent(String address) {
+      final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+      final MessageAnnotations messageAnnotations = new 
MessageAnnotations(annotations);
+      final Map<String, Object> eventMap = new LinkedHashMap<>();
+      final Section sectionBody = new AmqpValue(eventMap);
+      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      annotations.put(EVENT_TYPE, REQUESTED_ADDRESS_ADDED);
+
+      eventMap.put(REQUESTED_ADDRESS_NAME, address);
+
+      try {
+         final EncoderImpl encoder = TLSEncode.getEncoder();
+         encoder.setByteBuffer(new NettyWritable(buffer));
+         encoder.writeObject(messageAnnotations);
+         encoder.writeObject(sectionBody);
+
+         final byte[] data = new byte[buffer.writerIndex()];
+         buffer.readBytes(data);
+
+         return new AMQPStandardMessage(0, data, null);
+      } finally {
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   /**
+    * Decode and return the Map containing the event data for a Queue that was
+    * the target of a previous federation request which was not present on the
+    * remote server or was later removed has now been (re)added.
+    *
+    * @param message
+    *    The event message that carries the event data in its body.
+    *
+    * @return a {@link Map} containing the payload of the incoming event.
+    *
+    * @throws ActiveMQException if an error occurs while decoding the event 
data.
+    */
+   @SuppressWarnings("unchecked")
+   public static Map<String, Object> decodeQueueAddedEvent(AMQPMessage 
message) throws ActiveMQException {
+      final Section body = message.getBody();
+
+      if (!(body instanceof AmqpValue)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body was not an AmqpValue type");
+      }
+
+      final AmqpValue bodyValue = (AmqpValue) body;
+
+      if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof 
Map)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body AmqpValue did not carry an encoded Map");
+      }
+
+      try {
+         final Map<String, Object> eventMap = (Map<String, Object>) 
bodyValue.getValue();
+
+         if (!eventMap.containsKey(REQUESTED_ADDRESS_NAME)) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationEventMessage(
+               "Message body did not carry the required address name");
+         }
+
+         if (!eventMap.containsKey(REQUESTED_QUEUE_NAME)) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationEventMessage(
+               "Message body did not carry the required queue name");
+         }
+
+         return eventMap;
+      } catch (ActiveMQException amqEx) {
+         throw amqEx;
+      } catch (Exception e) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Invalid encoded queue added event entry: " + e.getMessage());
+      }
+   }
+
+   /**
+    * Decode and return the Map containing the event data for an Address that 
was
+    * the target of a previous federation request which was not present on the
+    * remote server or was later removed has now been (re)added.
+    *
+    * @param message
+    *    The event message that carries the event data in its body.
+    *
+    * @return a {@link Map} containing the payload of the incoming event.
+    *
+    * @throws ActiveMQException if an error occurs while decoding the event 
data.
+    */
+   @SuppressWarnings("unchecked")
+   public static Map<String, Object> decodeAddressAddedEvent(AMQPMessage 
message) throws ActiveMQException {
+      final Section body = message.getBody();
+
+      if (!(body instanceof AmqpValue)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body was not an AmqpValue type");
+      }
+
+      final AmqpValue bodyValue = (AmqpValue) body;
+
+      if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof 
Map)) {

Review Comment:
   Same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to