This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
commit 519bbd010090ad1a2e4a4dec220540a32bafd7d3 Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Tue Sep 24 16:59:27 2019 +0100 QPIDJMS-473: remove unused + incomplete anonymous fallback cache fragments --- .../amqp/AmqpAnonymousFallbackProducer.java | 109 +++------------------ .../qpid/jms/provider/amqp/AmqpConnection.java | 32 ------ 2 files changed, 15 insertions(+), 126 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java index 292b360..807ce4b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java @@ -16,9 +16,7 @@ */ package org.apache.qpid.jms.provider.amqp; -import java.util.Map; -import org.apache.qpid.jms.JmsDestination; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.meta.JmsProducerId; import org.apache.qpid.jms.meta.JmsProducerInfo; @@ -27,7 +25,6 @@ import org.apache.qpid.jms.provider.ProviderException; import org.apache.qpid.jms.provider.WrappedAsyncResult; import org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder; import org.apache.qpid.jms.util.IdGenerator; -import org.apache.qpid.jms.util.LRUCache; import org.apache.qpid.proton.engine.EndpointState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +40,6 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousFallbackProducer.class); private static final IdGenerator producerIdGenerator = new IdGenerator(); - private final AnonymousProducerCache producerCache; private final String producerIdKey = producerIdGenerator.generateId(); private long producerIdCount; @@ -57,13 +53,6 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { */ public AmqpAnonymousFallbackProducer(AmqpSession session, JmsProducerInfo info) { super(session, info); - - if (connection.isAnonymousProducerCache()) { - producerCache = new AnonymousProducerCache(10); - producerCache.setMaxCacheSize(connection.getAnonymousProducerCacheSize()); - } else { - producerCache = null; - } } @Override @@ -75,46 +64,23 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { // the send. envelope.setSendAsync(false); - AmqpProducer producer = null; - if (connection.isAnonymousProducerCache()) { - producer = producerCache.get(envelope.getDestination()); - } + // Create a new ProducerInfo for the short lived producer that's created to perform the + // send to the given AMQP target. + JmsProducerInfo info = new JmsProducerInfo(getNextProducerId()); + info.setDestination(envelope.getDestination()); + info.setPresettle(this.getResourceInfo().isPresettle()); - if (producer == null) { - // Create a new ProducerInfo for the short lived producer that's created to perform the - // send to the given AMQP target. - JmsProducerInfo info = new JmsProducerInfo(getNextProducerId()); - info.setDestination(envelope.getDestination()); - info.setPresettle(this.getResourceInfo().isPresettle()); - - // We open a Fixed Producer instance with the target destination. Once it opens - // it will trigger the open event which will in turn trigger the send event. - // If caching is disabled the created producer will be closed immediately after - // the entire send chain has finished and the delivery has been acknowledged. - AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info); - builder.buildResource(new AnonymousSendRequest(request, builder, envelope)); - - if (connection.isAnonymousProducerCache()) { - // Cache it in hopes of not needing to create large numbers of producers. - producerCache.put(envelope.getDestination(), builder.getResource()); - } + // We open a Fixed Producer instance with the target destination. Once it opens + // it will trigger the open event which will in turn trigger the send event. + // The created producer will be closed immediately after the delivery has been acknowledged. + AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info); + builder.buildResource(new AnonymousSendRequest(request, builder, envelope)); - getParent().getProvider().pumpToProtonTransport(request); - } else { - producer.send(envelope, request); - } + getParent().getProvider().pumpToProtonTransport(request); } @Override public void close(AsyncResult request) { - // Trigger an immediate close, the internal producers that are currently in the cache - // if the cache is enabled. - if (connection.isAnonymousProducerCache()) { - for (AmqpProducer producer : producerCache.values()) { - producer.close(new CloseRequest(producer)); - } - } - request.onSuccess(); } @@ -201,22 +167,16 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { @Override public void onFailure(ProviderException result) { LOG.trace("Send phase of anonymous send failed: {} ", getProducerId()); - if (!connection.isAnonymousProducerCache()) { - AnonymousCloseRequest close = new AnonymousCloseRequest(this); - producer.close(close); - } + AnonymousCloseRequest close = new AnonymousCloseRequest(this); + producer.close(close); super.onFailure(result); } @Override public void onSuccess() { LOG.trace("Send phase of anonymous send complete: {} ", getProducerId()); - if (!connection.isAnonymousProducerCache()) { - AnonymousCloseRequest close = new AnonymousCloseRequest(this); - producer.close(close); - } else { - super.onSuccess(); - } + AnonymousCloseRequest close = new AnonymousCloseRequest(this); + producer.close(close); } @Override @@ -246,43 +206,4 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { return producer; } } - - private final class CloseRequest implements AsyncResult { - - private final AmqpProducer producer; - - public CloseRequest(AmqpProducer producer) { - this.producer = producer; - } - - @Override - public void onFailure(ProviderException result) { - AmqpAnonymousFallbackProducer.this.connection.getProvider().fireProviderException(result); - } - - @Override - public void onSuccess() { - LOG.trace("Close of anonymous producer {} complete", producer); - } - - @Override - public boolean isComplete() { - return producer.isClosed(); - } - } - - private final class AnonymousProducerCache extends LRUCache<JmsDestination, AmqpProducer> { - - private static final long serialVersionUID = 1L; - - public AnonymousProducerCache(int cacheSize) { - super(cacheSize); - } - - @Override - protected void onCacheEviction(Map.Entry<JmsDestination, AmqpProducer> cached) { - LOG.trace("Producer: {} evicted from producer cache", cached.getValue()); - cached.getValue().close(new CloseRequest(cached.getValue())); - } - } } diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index 30c6e02..5d55e6c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -56,8 +56,6 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn private AmqpConnectionSession connectionSession; private boolean objectMessageUsesAmqpTypes = false; - private boolean anonymousProducerCache = false; - private int anonymousProducerCacheSize = 10; public AmqpConnection(AmqpProvider provider, JmsConnectionInfo info, Connection protonConnection) { super(info, protonConnection, provider); @@ -202,36 +200,6 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn } /** - * @return true if anonymous producers should be cached or closed on send complete. - */ - public boolean isAnonymousProducerCache() { - return anonymousProducerCache; - } - - /** - * @param anonymousProducerCache - * enable or disables the caching or anonymous producers. - */ - public void setAnonymousProducerCache(boolean anonymousProducerCache) { - this.anonymousProducerCache = anonymousProducerCache; - } - - /** - * @return the number of anonymous producers stored in each cache. - */ - public int getAnonymousProducerCacheSize() { - return anonymousProducerCacheSize; - } - - /** - * @param anonymousProducerCacheSize - * the number of producers each anonymous producer instance will cache. - */ - public void setAnonymousProducerCacheSize(int anonymousProducerCacheSize) { - this.anonymousProducerCacheSize = anonymousProducerCacheSize; - } - - /** * @return true if new ObjectMessage instance should default to using AMQP Typed bodies. */ public boolean isObjectMessageUsesAmqpTypes() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org