Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java?rev=1335290&view=auto ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java (added) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java Mon May 7 22:40:52 2012 @@ -0,0 +1,347 @@ +package org.apache.qpid.server.store; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class StoreOverfullTest extends QpidBrokerTestCase +{ + private static final int TIMEOUT = 10000; + public static final int TEST_SIZE = 150; + + private Connection _producerConnection; + private Connection _consumerConnection; + private Session _producerSession; + private Session _consumerSession; + private MessageProducer _producer; + private MessageConsumer _consumer; + private Queue _queue; + + //private final AtomicInteger sentMessages = new AtomicInteger(0); + + private static final int OVERFULL_SIZE = 4000000; + private static final int UNDERFULL_SIZE = 3500000; + + public void setUp() throws Exception + { + setConfigurationProperty("virtualhosts.virtualhost.test.store.overfull-size", + String.valueOf(OVERFULL_SIZE)); + setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size", + String.valueOf(UNDERFULL_SIZE)); + setSystemProperty("qpid.bdb.envconfig.je.log.fileMax", "1000000"); + super.setUp(); + + _producerConnection = getConnection(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producerConnection.start(); + + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + } + + public void tearDown() throws Exception + { + try + { + _producerConnection.close(); + _consumerConnection.close(); + } + finally + { + super.tearDown(); + } + } + + /* + * Test: + * + * Send > threshold amount of data : Sender is blocked + * Remove 90% of data : Sender is unblocked + * + */ + public void testCapacityExceededCausesBlock() throws Exception + { + AtomicInteger sentMessages = new AtomicInteger(0); + _queue = getTestQueue(); + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); + + _producer = _producerSession.createProducer(_queue); + + sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + + while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount = sentMessages.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + int mostMessages = (int) (0.9 * sentCount); + for(int i = 0; i < mostMessages; i++) + { + if(_consumer.receive(1000l) == null) + { + break; + } + } + + long targetTime = System.currentTimeMillis() + 5000l; + while(sentMessages.get() == sentCount && System.currentTimeMillis() < targetTime) + { + Thread.sleep(100l); + } + + assertFalse("Did not unblock on consuming messages", sentMessages.get() == sentCount); + + for(int i = mostMessages; i < TEST_SIZE; i++) + { + if(_consumer.receive(1000l) == null) + { + break; + } + } + + assertTrue("Not all messages were sent", sentMessages.get() == TEST_SIZE); + + } + + /* Two producers on different queues + */ + + public void testCapacityExceededCausesBlockTwoConnections() throws Exception + { + AtomicInteger sentMessages = new AtomicInteger(0); + AtomicInteger sentMessages2 = new AtomicInteger(0); + + _queue = getTestQueue(); + AMQQueue queue2 = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName() + "_2"); + + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); + + _producer = _producerSession.createProducer(_queue); + + Connection secondProducerConnection = getConnection(); + Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer secondProducer = secondProducerSession.createProducer(queue2); + + sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); + + while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount = sentMessages.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); + + + while(!((AMQSession)secondProducerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount2 = sentMessages2.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); + + + _consumer = _consumerSession.createConsumer(_queue); + MessageConsumer consumer2 = _consumerSession.createConsumer(queue2); + _consumerConnection.start(); + + + for(int i = 0; i < 2*TEST_SIZE; i++) + { + if(_consumer.receive(1000l) == null + && consumer2.receive(1000l) == null) + { + break; + } + } + + assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get()); + assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get()); + } + + /* + * New producers are blocked + */ + + public void testCapacityExceededCausesBlockNewConnection() throws Exception + { + AtomicInteger sentMessages = new AtomicInteger(0); + AtomicInteger sentMessages2 = new AtomicInteger(0); + + _queue = getTestQueue(); + + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); + + _producer = _producerSession.createProducer(_queue); + + Connection secondProducerConnection = getConnection(); + Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer secondProducer = secondProducerSession.createProducer(_queue); + + sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + + while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount = sentMessages.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); + + sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); + + while(!((AMQSession)secondProducerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount2 = sentMessages2.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); + + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + + for(int i = 0; i < 2*TEST_SIZE; i++) + { + if(_consumer.receive(2000l) == null) + { + break; + } + } + + assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get()); + assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get()); + + } + + + + private MessageSender sendMessagesAsync(final MessageProducer producer, + final Session producerSession, + final int numMessages, + long sleepPeriod, + AtomicInteger sentMessages) + { + MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod, sentMessages); + new Thread(sender).start(); + return sender; + } + + private class MessageSender implements Runnable + { + private final MessageProducer _senderProducer; + private final Session _senderSession; + private final int _numMessages; + private volatile JMSException _exception; + private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1); + private long _sleepPeriod; + private final AtomicInteger _sentMessages; + + public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages) + { + _senderProducer = producer; + _senderSession = producerSession; + _numMessages = numMessages; + _sleepPeriod = sleepPeriod; + _sentMessages = sentMessages; + } + + public void run() + { + try + { + sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod, _sentMessages); + } + catch (JMSException e) + { + _exception = e; + _exceptionThrownLatch.countDown(); + } + } + + public Exception awaitSenderException(long timeout) throws InterruptedException + { + _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS); + return _exception; + } + } + + private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages) + throws JMSException + { + + for (int msg = 0; msg < numMessages; msg++) + { + producer.send(nextMessage(msg, producerSession)); + sentMessages.incrementAndGet(); + + + try + { + ((AMQSession<?,?>)producerSession).sync(); + } + catch (AMQException e) + { + e.printStackTrace(); + throw new RuntimeException(e); + } + + try + { + Thread.sleep(sleepPeriod); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + private static final byte[] BYTE_32K = new byte[32*1024]; + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + BytesMessage send = producerSession.createBytesMessage(); + send.writeBytes(BYTE_32K); + send.setIntProperty("msg", msg); + + return send; + } +}
Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=1335290&r1=1335289&r2=1335290&view=diff ============================================================================== --- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original) +++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Mon May 7 22:40:52 2012 @@ -121,6 +121,8 @@ org.apache.qpid.test.client.message.Sele //QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker) org.apache.qpid.server.queue.ProducerFlowControlTest#* +//QPID-3986 : Flow control invoked on total store disk usage +org.apache.qpid.server.store.StoreOverfullTest#* org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage#* Modified: qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes?rev=1335290&r1=1335289&r2=1335290&view=diff ============================================================================== --- qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes (original) +++ qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes Mon May 7 22:40:52 2012 @@ -30,7 +30,6 @@ org.apache.qpid.test.unit.xa.TopicTest#t org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testRecover - org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence @@ -39,6 +38,7 @@ org.apache.qpid.server.store.MessageStor org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval +org.apache.qpid.server.store.StoreOverfullTest#* org.apache.qpid.server.store.berkeleydb.* --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org