This is an automated email from the ASF dual-hosted git repository. vavrtom pushed a commit to branch QPID-8683-set-id-for-delayed-delivery in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/QPID-8683-set-id-for-delayed-delivery by this push: new 5f0654b06b Set destination ID for DELAYED_DELIVERY producer 5f0654b06b is described below commit 5f0654b06b02811fbf9d7391724d96dbab09f40c Author: vavrtom <cz235...@tiscali.cz> AuthorDate: Fri Feb 21 08:35:00 2025 +0100 Set destination ID for DELAYED_DELIVERY producer --- .../org/apache/qpid/server/model/Producer.java | 26 +++++++++++++++++++++- .../org/apache/qpid/server/model/ProducerImpl.java | 15 +++---------- .../v1_0/StandardReceivingLinkEndpoint.java | 10 ++------- 3 files changed, 30 insertions(+), 21 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java index 75f7636fb9..a40e47cc58 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java @@ -23,13 +23,37 @@ package org.apache.qpid.server.model; import java.util.UUID; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.qpid.server.message.MessageDestination; @ManagedObject(creatable = false, amqpName = "org.apache.qpid.Producer") public interface Producer<X extends Producer<X>> extends ConfiguredObject<X> { enum DeliveryType { DELAYED_DELIVERY, STANDARD_DELIVERY } - enum DestinationType { EXCHANGE, QUEUE } + enum DestinationType + { + EXCHANGE, + QUEUE; + + public static DestinationType from(MessageDestination messageDestination) + { + if (messageDestination instanceof Exchange) + { + return EXCHANGE; + } + else if (messageDestination instanceof Queue) + { + return QUEUE; + } + return null; + } + + public static UUID getId(MessageDestination messageDestination) + { + final DestinationType destinationType = from(messageDestination); + return destinationType == null ? null : ((ConfiguredObject<?>) messageDestination).getId(); + } + } void registerMessageDelivered(long messageSize); diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java index 26a8b70ebc..2333eddb93 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java @@ -67,25 +67,16 @@ public class ProducerImpl<X extends Producer<X>> _sessionName = session.getName(); _principal = session.getAMQPConnection().getPrincipal(); _remoteAddress = session.getAMQPConnection().getRemoteAddress(); - _destination = messageDestination == null ? null : messageDestination.getName(); if (messageDestination == null) { _deliveryType = DeliveryType.DELAYED_DELIVERY; - _destinationType = null; } else { _deliveryType = DeliveryType.STANDARD_DELIVERY; - if (messageDestination instanceof Exchange) - { - _destinationId = ((Exchange<?>) messageDestination).getId(); - _destinationType = DestinationType.EXCHANGE; - } - else if (messageDestination instanceof Queue) - { - _destinationId = ((Queue<?>) messageDestination).getId(); - _destinationType = DestinationType.QUEUE; - } + _destination = messageDestination.getName(); + _destinationType = DestinationType.from(messageDestination); + _destinationId = DestinationType.getId(messageDestination); } registerWithParents(); diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java index 772935067c..df99809fe2 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java @@ -296,14 +296,8 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint { final MessageDestination messageDestination = getAddressSpace() .getAttainedMessageDestination(serverMessage.getTo(), false); - if (messageDestination != null) - { - final Producer.DestinationType destinationType = - messageDestination instanceof Exchange - ? Producer.DestinationType.EXCHANGE - : Producer.DestinationType.QUEUE; - _producer.setDestinationType(destinationType); - } + _producer.setDestinationType(Producer.DestinationType.from(messageDestination)); + _producer.setDestinationId(Producer.DestinationType.getId(messageDestination)); _producer.setDestination(to); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org