Repository: activemq-artemis Updated Branches: refs/heads/2.6.x b5c862feb -> d54cce16c
ARTEMIS-2159 OpenWire would allow one extra send Thanks to Otavio Piske collaborating a test change here. (cherry picked from commit 02a6d5bb493d6e0eea1ed847157d4e6b57aacf7f) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d54cce16 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d54cce16 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d54cce16 Branch: refs/heads/2.6.x Commit: d54cce16cb3c8a5169fe8d076b0e0a9a4e388561 Parents: b5c862f Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Wed Oct 31 09:13:05 2018 -0400 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Wed Oct 31 12:46:57 2018 -0400 ---------------------------------------------------------------------- .../core/protocol/openwire/amq/AMQSession.java | 100 +++++++++---------- .../core/paging/impl/PagingStoreImpl.java | 4 +- .../openwire/OpenWireFlowControlFailTest.java | 30 ++++-- 3 files changed, 76 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d54cce16/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 0250f1c..a107ba7 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -443,63 +443,63 @@ public class AMQSession implements SessionCallback { final AtomicInteger count, final org.apache.activemq.artemis.api.core.Message coreMsg, final SimpleString address) throws ResourceAllocationException { - if (!store.checkMemory(() -> { - Exception exceptionToSend = null; - - try { - getCoreSession().send(coreMsg, false, dest.isTemporary()); - } catch (Exception e) { - logger.warn(e.getMessage(), e); - exceptionToSend = e; - } + if (!store.checkMemory(null)) { + this.connection.getContext().setDontSendReponse(false); connection.enableTtl(); - if (count == null || count.decrementAndGet() == 0) { - if (exceptionToSend != null) { - this.connection.getContext().setDontSendReponse(false); - connection.sendException(exceptionToSend); - } else { - server.getStorageManager().afterCompleteOperations(new IOCallback() { - @Override - public void done() { - if (sendProducerAck) { - try { - ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); - connection.dispatchAsync(ack); - } catch (Exception e) { - connection.getContext().setDontSendReponse(false); - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - connection.sendException(e); - } - } else { + throw new ResourceAllocationException("Queue is full " + address); + } + + Exception exceptionToSend = null; + + try { + getCoreSession().send(coreMsg, false, dest.isTemporary()); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + exceptionToSend = e; + } + connection.enableTtl(); + if (count == null || count.decrementAndGet() == 0) { + if (exceptionToSend != null) { + this.connection.getContext().setDontSendReponse(false); + connection.sendException(exceptionToSend); + } else { + server.getStorageManager().afterCompleteOperations(new IOCallback() { + @Override + public void done() { + if (sendProducerAck) { + try { + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); + connection.dispatchAsync(ack); + } catch (Exception e) { connection.getContext().setDontSendReponse(false); - try { - Response response = new Response(); - response.setCorrelationId(messageSend.getCommandId()); - connection.dispatchAsync(response); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - connection.sendException(e); - } + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + connection.sendException(e); } - } - - @Override - public void onError(int errorCode, String errorMessage) { + } else { + connection.getContext().setDontSendReponse(false); try { - final IOException e = new IOException(errorMessage); - ActiveMQServerLogger.LOGGER.warn(errorMessage); - connection.serviceException(e); - } catch (Exception ex) { - ActiveMQServerLogger.LOGGER.debug(ex); + Response response = new Response(); + response.setCorrelationId(messageSend.getCommandId()); + connection.dispatchAsync(response); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + connection.sendException(e); } } - }); - } + } + + @Override + public void onError(int errorCode, String errorMessage) { + try { + final IOException e = new IOException(errorMessage); + ActiveMQServerLogger.LOGGER.warn(errorMessage); + connection.serviceException(e); + } catch (Exception ex) { + ActiveMQServerLogger.LOGGER.debug(ex); + } + } + }); } - })) { - this.connection.getContext().setDontSendReponse(false); - connection.enableTtl(); - throw new ResourceAllocationException("Queue is full " + address); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d54cce16/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 00001cc..908ab9f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -693,7 +693,9 @@ public class PagingStoreImpl implements PagingStore { } } - runWhenAvailable.run(); + if (runWhenAvailable != null) { + runWhenAvailable.run(); + } return true; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d54cce16/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java index 341f920..a2685b0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; @@ -58,34 +59,49 @@ public class OpenWireFlowControlFailTest extends OpenWireTestBase { textBody.append(" "); } ConnectionFactory factory = new ActiveMQConnectionFactory(urlString); + int numberOfMessage = 0; try (Connection connection = factory.createConnection()) { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); javax.jms.Queue queue = session.createQueue(addressInfo.getName().toString()); MessageProducer producer = session.createProducer(queue); - int numberOfMessage = 0; boolean failed = false; try { for (int i = 0; i < 1000; i++) { - producer.send(session.createTextMessage(textBody.toString())); + TextMessage message = session.createTextMessage(textBody.toString()); + message.setIntProperty("i", i); + + producer.send(message); numberOfMessage++; } } catch (Exception e) { e.printStackTrace(System.out); failed = true; + try { + producer.send(session.createTextMessage(textBody.toString())); + Assert.fail("Exception expected"); + } catch (JMSException expected) { + expected.printStackTrace(); + + } } + Assert.assertTrue(failed); + } - System.out.println("Message failed with " + numberOfMessage); + factory = new ActiveMQConnectionFactory(urlString); + try (Connection connection2 = factory.createConnection()) { + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session2.createQueue(addressInfo.getName().toString()); - Assert.assertTrue(failed); - MessageConsumer consumer = session.createConsumer(queue); - connection.start(); + MessageConsumer consumer = session2.createConsumer(queue); + connection2.start(); for (int i = 0; i < numberOfMessage; i++) { TextMessage message = (TextMessage) consumer.receive(5000); Assert.assertNotNull(message); Assert.assertEquals(textBody.toString(), message.getText()); } - Assert.assertNull(consumer.receiveNoWait()); + TextMessage msg = (TextMessage)consumer.receive(500); + Assert.assertNull(msg); } } }