Github user gaohoward commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92076234
  
    --- Diff: 
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
 ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              
originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(),
 messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new 
ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = 
connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding 
races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || 
messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new 
AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new 
AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new 
SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new 
PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if 
that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = 
server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == 
AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = 
server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new 
SimpleString(actualDestinations[i].getPhysicalName()), 
actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, 
false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
 RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && 
dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish 
to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     
ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new 
ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: 
connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           
response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + 
address);
    +            }
              } else {
    -            
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
 RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, 
actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on 
the calling thread
    +            //which means the whole connection is blocked when address is 
full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + 
address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && 
actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a 
non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    Yes the openwire producer flow control has a window size to use, but it 
mostly controlled at client side (it doesn't ask for credits from server like 
core does). 
    The reason why we block the entire connection is that openwire has a 
specific test for this use case:
    
org.apache.activemq.artemis.tests.integration.openwire.amq.ProducerFlowControlTest#test2ndPublisherWithStandardConnectionThatIsBlocked
    It is usually not a good option but there are other options the user can 
use (see other tests in ProducerFlowControlTest).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to