Hi Tim,

Please find attached a sample application testing transaction
throughput for one publishing and one consuming connections.
By running it on my machine I see a difference in performance of ~15%
between 0.11.0 and 0.20.0.

Our performance test suite uses 10 publishing and 10 consuming
connections. The difference in performance is a bit bigger with it.

Btw, you can run the suite from qpid-java project using

mvn integration-test -f ./perftests/pom.xml -DskipTests=true \
-Dperftests=qpid-jms-client  -Dqpid.disttest.duration=20000   \
-Dperftests.hillclimb=true -Dperftests.hillclimb.minimum_delta=100
-Dperftests.hillclimb.max_runs=2 \
-Dperftests.test-config=./perftests/etc/testdefs \
-Dqpid-jms-client-version=0.11.0 \
 /// <--- parameters below are default values and can be skipped
-Dperftests.messaging-hostport-plain=localhost:5672 \
-Dperftests.messaging-hostport-tls=localhost:5671 \
-Dperftests.messaging-user=guest \
-Dperftests.messaging-password=guest \
-Dperftests.broker-virtualhostnode=default \
-Dperftests.broker-virtualhost=default \
-Dperftests.manangement-url=http://localhost:8080 \
-Dperftests.manangement-user=guest \
-Dperftests.manangement-password=guest \
-Dperftests.broker-virtualhostnode=default


The basic authentication needs to be enabled on http port in order to
run the suite. The suite contains 2 transaction and 2 auto ack tests.
Only transactions tests are affected. The difference in performance
for Auto ack is unnoticeable.
Potentially you can comment auto-ack tests in
./perftests/etc/testdefs/defaultTests.js and run only transaction
tests.

Kind Regards,
Alex

On 28 February 2017 at 17:04, Timothy Bish <tabish...@gmail.com> wrote:
> On 02/28/2017 08:10 AM, Oleksandr Rudyy wrote:
>>
>> Hi all,
>>
>> After upgrading Qpid JMS Client from version 0.11.0 to version 0.20.0
>> our performance test results  dropped approximately on 20-25% in the
>> performance tests testing transaction performance with trunk version
>> of Qpid Java Broker.
>>
>> Changing client back to v0.11.0 and re-running the tests yields me the
>> same results as before the upgrade.
>>
>> What changes on the client could be the reason for a performance drop?
>
>
> There was a great deal of change in the client to implement the extra bits
> needed to meet the JMS 2.0 specification requirements along with other
> changes to handle of the more interesting edge cases around JMS usages so
> it'd be hard to just pull out one change as the culprit without more
> research.  I'm not that surprised that there were some impacts.
>
> What would be helpful would be to pull out one affected test case where you
> see this drop in performance into a standalone test so we could run it in
> the Qpid JMS test suite and do some digging.
>
>> The performance test creates 10 publishing connections and 10
>> consuming connections. They produce/consume every message in separate
>> transactions. The consumer is synchronous. Each pair of producer and
>> consumer uses its own queue.
>>
>> Kind Regards,
>> Alex
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
>> For additional commands, e-mail: users-h...@qpid.apache.org
>>
>>
>
>
> --
> Tim Bish
> twitter: @tabish121
> blog: http://timbish.blogspot.com/
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> For additional commands, e-mail: users-h...@qpid.apache.org
>
package org.apache.qpid.perfomance;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class BenchMark
{
    private final String _broker;
    private final String _user;
    private final String _password;
    private final int _duration;
    private final int _messageSize;
    private final String _queueName;

    public BenchMark(final String broker,
                     final String user,
                     final String password,
                     final String queueName,
                     final int duration,
                     final int messageSize)
    {
        _broker = broker;
        _user= user;
        _password = password;
        _duration = duration;
        _messageSize = messageSize;
        _queueName = queueName;
    }


    public static void main(String[] args) throws Exception
    {
        String broker = System.getProperty("broker", "localhost:5672");
        String user = System.getProperty("user", "guest");
        String password = System.getProperty("password", "guest");
        String testQueue = System.getProperty("queue", "test");
        int duration = Integer.getInteger("duration", 60000);
        int messageSize = Integer.getInteger("size", 1024);
        System.out.println(String.format(
                "Testing performance for duration of %d seconds against broker '%s'. Test queue is '%s'.",
                duration/1000,
                broker,
                testQueue));
        BenchMark benchMark = new BenchMark(broker, user, password, testQueue, duration, messageSize);
        Result result = benchMark.run();
        System.out.println(result);
    }

    public Result run()
            throws NamingException, JMSException, InterruptedException
    {
        Connection publishingConnection = createConnection();
        MessageHandler handler;
        long start;

        try
        {
            Session producerSession = publishingConnection.createSession(true, Session.SESSION_TRANSACTED);
            Queue queue = producerSession.createQueue(_queueName);
            Connection consumerConnection = createConnection();
            try
            {
                Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
                MessageConsumer consumer = consumerSession.createConsumer(queue);
                handler = new MessageHandler(consumerSession);
                consumer.setMessageListener(handler);
                consumerConnection.start();
                BytesMessage message = producerSession.createBytesMessage();
                message.writeBytes(new byte[_messageSize]);
                MessageProducer producer = producerSession.createProducer(queue);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                AtomicLong messageCounter = new AtomicLong();
                start = System.currentTimeMillis();
                long end = start + _duration;
                long currentTime;
                do
                {
                    producer.send(message);
                    producerSession.commit();
                    long numberOfProducedMessages = messageCounter.incrementAndGet();
                    currentTime = System.currentTimeMillis();

                    while ((numberOfProducedMessages - handler.getMessageCount()) > 100 && currentTime < end )
                    {
                        Thread.sleep(50);
                    }
                }
                while (currentTime < end && !handler.exceptionOccurred());
                handler.end();

                // consume remaining
                while (!handler.exceptionOccurred() && messageCounter.get() > handler.getMessageCount())
                {
                    Thread.sleep(50);
                }

                consumer.close();
                consumerSession.close();
            }
            finally
            {
                consumerConnection.close();
            }
            producerSession.close();
        }
        finally
        {
            publishingConnection.close();
        }

        if (handler != null)
        {
            double testDuration = handler.getEndTime() - start;
            if (testDuration > 0)
            {
                final double messageThroughput = (double) handler.getMessageNumber() / testDuration;
                final long bytesThroughput =
                        (long) Math.ceil((double) handler.getMessageCount() / testDuration * (double) _messageSize);
                return new Result(messageThroughput, bytesThroughput, handler.getException());
            }
        }
        return null;
    }

    private Connection createConnection() throws NamingException, JMSException
    {
        final Properties properties = new Properties();
        properties.put("java.naming.factory.initial", "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        properties.put("connectionfactory.myFactory", "amqp://" + _broker);
        Context context = new InitialContext(properties);
        Connection connection = null;
        try
        {
            ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactory");
            connection = factory.createConnection(_user, _password);
        }
        finally
        {
            context.close();
        }
        return connection;
    }

    private static class MessageHandler implements MessageListener
    {
        private final AtomicLong _messageCounter = new AtomicLong();
        private final Session _session;
        private volatile JMSException _exception;
        private volatile long _endTime;
        private volatile long _messageNumber;

        public MessageHandler(final Session session)
        {
            _session = session;
        }

        @Override
        public void onMessage(final Message message)
        {
            try
            {
                _session.commit();
            }
            catch (JMSException e)
            {
                _exception = e;
            }
            _messageCounter.incrementAndGet();
        }

        public long getMessageCount()
        {
            return _messageCounter.get();
        }

        public boolean exceptionOccurred()
        {
            return _exception != null;
        }

        public JMSException getException()
        {
            return _exception;
        }

        public void end()
        {
            _endTime = System.currentTimeMillis();
            _messageNumber = _messageCounter.get();
        }

        public long getEndTime()
        {
            return _endTime;
        }

        public long getMessageNumber()
        {
            return _messageNumber;
        }
    }


    public static class Result
    {
        private final JMSException _exception;
        double _messageThroughput;
        long _bytesThroughput;

        public Result(final double messageThroughput, final long bytesThroughput, JMSException exception)
        {
            _messageThroughput = messageThroughput;
            _bytesThroughput = bytesThroughput;
            _exception = exception;
        }

        public double getMessageThroughput()
        {
            return _messageThroughput;
        }

        public long getBytesThroughput()
        {
            return _bytesThroughput;
        }

        @Override
        public String toString()
        {
            String result = String.format("Throughput: %.2f messages/second, %d bytes/second",
                                          _messageThroughput,
                                          _bytesThroughput);
            if (_exception != null)
            {
                result += System.getProperty("line.separator");
                StringWriter sw = new StringWriter();
                PrintWriter pw = new PrintWriter(sw);
                _exception.printStackTrace(pw);
                result += sw.toString();
            }
            return result;
        }
    }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to