Added: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java?rev=1335290&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
 (added)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
 Mon May  7 22:40:52 2012
@@ -0,0 +1,347 @@
+package org.apache.qpid.server.store;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class StoreOverfullTest extends QpidBrokerTestCase
+{
+    private static final int TIMEOUT = 10000;
+    public static final int TEST_SIZE = 150;
+
+    private Connection _producerConnection;
+    private Connection _consumerConnection;
+    private Session _producerSession;
+    private Session _consumerSession;
+    private MessageProducer _producer;
+    private MessageConsumer _consumer;
+    private Queue _queue;
+
+    //private final AtomicInteger sentMessages = new AtomicInteger(0);
+
+    private static final int OVERFULL_SIZE = 4000000;
+    private static final int UNDERFULL_SIZE = 3500000;
+
+    public void setUp() throws Exception
+    {
+        
setConfigurationProperty("virtualhosts.virtualhost.test.store.overfull-size",
+                String.valueOf(OVERFULL_SIZE));
+        
setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size",
+                String.valueOf(UNDERFULL_SIZE));
+        setSystemProperty("qpid.bdb.envconfig.je.log.fileMax", "1000000");
+        super.setUp();
+
+        _producerConnection = getConnection();
+        _producerSession = _producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        _producerConnection.start();
+
+        _consumerConnection = getConnection();
+        _consumerSession = _consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+    }
+
+    public void tearDown() throws Exception
+    {
+        try
+        {
+            _producerConnection.close();
+            _consumerConnection.close();
+        }
+        finally
+        {
+            super.tearDown();
+        }
+    }
+
+    /*
+     * Test:
+     *
+     * Send > threshold amount of data : Sender is blocked
+     * Remove 90% of data : Sender is unblocked
+     *
+     */
+    public void testCapacityExceededCausesBlock() throws Exception
+    {
+        AtomicInteger sentMessages = new AtomicInteger(0);
+        _queue = getTestQueue();
+        ((AMQSession<?,?>) 
_producerSession).declareAndBind((AMQDestination)_queue);
+
+        _producer = _producerSession.createProducer(_queue);
+
+        sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, 
sentMessages);
+
+        while(!((AMQSession)_producerSession).isBrokerFlowControlled())
+        {
+            Thread.sleep(100l);
+        }
+        int sentCount = sentMessages.get();
+        assertFalse("Did not block before sending all messages", TEST_SIZE == 
sentCount);
+
+        _consumer = _consumerSession.createConsumer(_queue);
+        _consumerConnection.start();
+
+        int mostMessages = (int) (0.9 * sentCount);
+        for(int i = 0; i < mostMessages; i++)
+        {
+            if(_consumer.receive(1000l) == null)
+            {
+                break;
+            }
+        }
+        
+        long targetTime = System.currentTimeMillis() + 5000l;
+        while(sentMessages.get() == sentCount && System.currentTimeMillis() < 
targetTime)
+        {
+            Thread.sleep(100l);
+        }
+
+        assertFalse("Did not unblock on consuming messages", 
sentMessages.get() == sentCount);
+
+        for(int i = mostMessages; i < TEST_SIZE; i++)
+        {
+            if(_consumer.receive(1000l) == null)
+            {
+                break;
+            }
+        }
+
+        assertTrue("Not all messages were sent", sentMessages.get() == 
TEST_SIZE);
+        
+    }
+
+    /* Two producers on different queues
+     */
+
+    public void testCapacityExceededCausesBlockTwoConnections() throws 
Exception
+    {
+        AtomicInteger sentMessages = new AtomicInteger(0);
+        AtomicInteger sentMessages2 = new AtomicInteger(0);
+
+        _queue = getTestQueue();
+        AMQQueue queue2 = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, 
getTestQueueName() + "_2");
+
+        ((AMQSession<?,?>) 
_producerSession).declareAndBind((AMQDestination)_queue);
+
+        _producer = _producerSession.createProducer(_queue);
+
+        Connection secondProducerConnection = getConnection();
+        Session secondProducerSession = 
secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer secondProducer = 
secondProducerSession.createProducer(queue2);
+
+        sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, 
sentMessages);
+        sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 
50L, sentMessages2);
+
+        while(!((AMQSession)_producerSession).isBrokerFlowControlled())
+        {
+            Thread.sleep(100l);
+        }
+        int sentCount = sentMessages.get();
+        assertFalse("Did not block before sending all messages", TEST_SIZE == 
sentCount);
+
+
+        while(!((AMQSession)secondProducerSession).isBrokerFlowControlled())
+        {
+            Thread.sleep(100l);
+        }
+        int sentCount2 = sentMessages2.get();
+        assertFalse("Did not block before sending all messages", TEST_SIZE == 
sentCount2);
+
+
+        _consumer = _consumerSession.createConsumer(_queue);
+        MessageConsumer consumer2 = _consumerSession.createConsumer(queue2);
+        _consumerConnection.start();
+
+
+        for(int i = 0; i < 2*TEST_SIZE; i++)
+        {
+            if(_consumer.receive(1000l) == null
+                && consumer2.receive(1000l) == null)
+            {
+                break;
+            }
+        }
+
+        assertEquals("Not all messages were sent from the first sender", 
TEST_SIZE, sentMessages.get());
+        assertEquals("Not all messages were sent from the second sender", 
TEST_SIZE, sentMessages2.get());
+    }
+
+    /*
+     * New producers are blocked
+     */
+
+    public void testCapacityExceededCausesBlockNewConnection() throws Exception
+    {
+        AtomicInteger sentMessages = new AtomicInteger(0);
+        AtomicInteger sentMessages2 = new AtomicInteger(0);
+
+        _queue = getTestQueue();
+
+        ((AMQSession<?,?>) 
_producerSession).declareAndBind((AMQDestination)_queue);
+
+        _producer = _producerSession.createProducer(_queue);
+
+        Connection secondProducerConnection = getConnection();
+        Session secondProducerSession = 
secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer secondProducer = 
secondProducerSession.createProducer(_queue);
+
+        sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, 
sentMessages);
+
+        while(!((AMQSession)_producerSession).isBrokerFlowControlled())
+        {
+            Thread.sleep(100l);
+        }
+        int sentCount = sentMessages.get();
+        assertFalse("Did not block before sending all messages", TEST_SIZE == 
sentCount);
+
+        sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 
50L, sentMessages2);
+
+        while(!((AMQSession)secondProducerSession).isBrokerFlowControlled())
+        {
+            Thread.sleep(100l);
+        }
+        int sentCount2 = sentMessages2.get();
+        assertFalse("Did not block before sending all messages", TEST_SIZE == 
sentCount2);
+
+
+        _consumer = _consumerSession.createConsumer(_queue);
+        _consumerConnection.start();
+
+
+        for(int i = 0; i < 2*TEST_SIZE; i++)
+        {
+            if(_consumer.receive(2000l) == null)
+            {
+                break;
+            }
+        }
+
+        assertEquals("Not all messages were sent from the first sender", 
TEST_SIZE, sentMessages.get());
+        assertEquals("Not all messages were sent from the second sender", 
TEST_SIZE, sentMessages2.get());
+
+    }
+
+
+
+    private MessageSender sendMessagesAsync(final MessageProducer producer,
+                                            final Session producerSession,
+                                            final int numMessages,
+                                            long sleepPeriod,
+                                            AtomicInteger sentMessages)
+    {
+        MessageSender sender = new MessageSender(producer, producerSession, 
numMessages,sleepPeriod, sentMessages);
+        new Thread(sender).start();
+        return sender;
+    }
+
+    private class MessageSender implements Runnable
+    {
+        private final MessageProducer _senderProducer;
+        private final Session _senderSession;
+        private final int _numMessages;
+        private volatile JMSException _exception;
+        private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1);
+        private long _sleepPeriod;
+        private final AtomicInteger _sentMessages;
+
+        public MessageSender(MessageProducer producer, Session 
producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages)
+        {
+            _senderProducer = producer;
+            _senderSession = producerSession;
+            _numMessages = numMessages;
+            _sleepPeriod = sleepPeriod;
+            _sentMessages = sentMessages;
+        }
+
+        public void run()
+        {
+            try
+            {
+                sendMessages(_senderProducer, _senderSession, _numMessages, 
_sleepPeriod, _sentMessages);
+            }
+            catch (JMSException e)
+            {
+                _exception = e;
+                _exceptionThrownLatch.countDown();
+            }
+        }
+
+        public Exception awaitSenderException(long timeout) throws 
InterruptedException
+        {
+            _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS);
+            return _exception;
+        }
+    }
+
+    private void sendMessages(MessageProducer producer, Session 
producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages)
+            throws JMSException
+    {
+
+        for (int msg = 0; msg < numMessages; msg++)
+        {
+            producer.send(nextMessage(msg, producerSession));
+            sentMessages.incrementAndGet();
+
+
+            try
+            {
+                ((AMQSession<?,?>)producerSession).sync();
+            }
+            catch (AMQException e)
+            {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            }
+
+            try
+            {
+                Thread.sleep(sleepPeriod);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private static final byte[] BYTE_32K = new byte[32*1024];
+
+    private Message nextMessage(int msg, Session producerSession) throws 
JMSException
+    {
+        BytesMessage send = producerSession.createBytesMessage();
+        send.writeBytes(BYTE_32K);
+        send.setIntProperty("msg", msg);
+
+        return send;
+    }
+}

Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=1335290&r1=1335289&r2=1335290&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Mon May  7 22:40:52 2012
@@ -121,6 +121,8 @@ org.apache.qpid.test.client.message.Sele
 
 //QPID-942 : Implemented Channel.Flow based Producer Side flow control to the 
Java Broker (not in CPP Broker)
 org.apache.qpid.server.queue.ProducerFlowControlTest#*
+//QPID-3986 : Flow control invoked on total store disk usage
+org.apache.qpid.server.store.StoreOverfullTest#*
 
 
org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage#*
 

Modified: qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes?rev=1335290&r1=1335289&r2=1335290&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes Mon May  7 
22:40:52 2012
@@ -30,7 +30,6 @@ org.apache.qpid.test.unit.xa.TopicTest#t
 org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash
 org.apache.qpid.test.unit.xa.TopicTest#testRecover
 
-
 org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence
 org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval
 org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence
@@ -39,6 +38,7 @@ org.apache.qpid.server.store.MessageStor
 org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval
 org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence
 org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval
+org.apache.qpid.server.store.StoreOverfullTest#*
 
 org.apache.qpid.server.store.berkeleydb.*
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to