[ 
https://issues.apache.org/jira/browse/AMQ-5537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Tully updated AMQ-5537:
----------------------------
    Fix Version/s:     (was: 5.11.0)

> Network Connector Throughput
> ----------------------------
>
>                 Key: AMQ-5537
>                 URL: https://issues.apache.org/jira/browse/AMQ-5537
>             Project: ActiveMQ
>          Issue Type: Improvement
>          Components: Connector
>    Affects Versions: 5.x
>         Environment: Network of Brokers. Platform agnostic. Local Broker has 
> a networkConnector defined to forward all messages to a remote broker.
>            Reporter: Ehud Eshet
>
> *Requirement*
> 1.  Allow network connector to use transactions when forwarding persistent 
> messages.
> 2. Provide the following new network connector properties:
> maxMessagesPerTransaction - when specified and great than 1, use transactions.
> maxTransactionLatencyMillis - commit immediately when time passed since last 
> commit is more than specified.
> Let's say both parameters are set as 1000.
> Network connector should commit after every 1000 messages or when more than 
> 1000ms passed since last commit (the sooner).
> *Background*
> Persistent messages throughput is significantly slower.
> When using transactions and committing every 1000 messages, throughput on 
> local broker with levelDB is about 12,000 messages of 1KB per second.
> Network connector does not use transactions. Thus, its throughput is limited 
> to few hundreds messages per second.
> When imitating network connector functionality (receive from local broker and 
> send to remote broker) using transactions on both sessions, I managed to have 
> a sustained throughput of 10,000 messages/sec stored on local broker plus up 
> to 11,000 messages/s forwarded to remote broker (forwarding throughput must 
> be higher to allow catch up after reconnect).
> *Sample code*
> {code:title=TransactionalStoreAndForward.java|borderStyle=solid}
> import java.util.Date;
> import javax.jms.*;
> import javax.jms.Connection;
> import javax.jms.Message;
> import org.apache.activemq.*;
> import org.apache.activemq.broker.*;
> public class TransactionalStoreAndForward implements Runnable 
> {
>       private final String m_queueName;
>       private final ActiveMQConnectionFactory m_fromAMQF, m_toAMQF;
>       
>       private Connection m_fromConn = null, m_toConn = null;
>       private Session m_fromSess = null, m_toSess = null;
>       private MessageConsumer m_msgConsumer = null;
>       private MessageProducer m_msgProducer = null;
>       
>       private boolean m_cont = true;
>       
>       public static final int MAX_MESSAGES_PER_TRANSACTION = 500;
>       public static final long MAX_TRANSACTION_LATENCY_MILLIS = 5000L;
>       
>       public TransactionalStoreAndForward(String fromUri, String toUri, 
> String queueName)
>       {
>               m_fromAMQF = new ActiveMQConnectionFactory(fromUri);
>               m_toAMQF = new ActiveMQConnectionFactory(toUri);
>               m_queueName = queueName;
>       }
>       
>       @Override
>       public void run() 
>       {
>               while (m_cont)
>               {
>                       connect();
>                       process();
>               }
>       }
>       
>       private void process()
>       {
>               long txMessages = 0, totalMessages = 0, lastPrintMessages = 0;
>               long startTime = 0L;
>               long lastTxTime = startTime, lastPrintTime = startTime;
>               
>               Message msg = null;
>               
>               try {
>                       while (m_cont)
>                       {
>                               while ((msg = 
> m_msgConsumer.receive(MAX_TRANSACTION_LATENCY_MILLIS)) != null)
>                               {
>                                       if (startTime == 0) {
>                                               startTime = 
> System.currentTimeMillis();
>                                               lastTxTime = startTime;
>                                               lastPrintTime = startTime;
>                                       }
>                                       
>                                       m_msgProducer.send(msg);
>                                       txMessages++;
>                                       totalMessages++;
>                                       
>                                       if (txMessages == 
> MAX_MESSAGES_PER_TRANSACTION || 
>                                                       
> System.currentTimeMillis() - lastTxTime > MAX_TRANSACTION_LATENCY_MILLIS)
>                                       {
>                                               m_toSess.commit();
>                                               m_fromSess.commit();
>                                               lastTxTime = 
> System.currentTimeMillis();
>                                               txMessages = 0;
>                                       }
>                                       
>                                       if (System.currentTimeMillis() - 
> lastPrintTime > 10000L) {
>                                               System.out.println("processed " 
> + (totalMessages - lastPrintMessages) + " messages during last 10 seconds. 
> Avg. messages/s: " + (totalMessages * 1000L / (System.currentTimeMillis() - 
> startTime)) + " at " + new Date());
>                                               lastPrintTime = 
> System.currentTimeMillis();
>                                               lastPrintMessages = 
> totalMessages;
>                                       }
>                               }
>                               
>                               if (txMessages > 0)
>                               {
>                                       m_toSess.commit();
>                                       m_fromSess.commit();
>                                       lastTxTime = System.currentTimeMillis();
>                                       txMessages = 0;
>                               }
>                               else {
>                                       System.out.println("Idle for more than 
> a minute at " + new Date());
>                               }
>                       }
>               }
>               catch(JMSException jmse)
>               {
>                       System.out.println("About to rollback " + txMessages + 
> " messages due to: " + jmse.getMessage());
>                       try {
>                               m_toSess.rollback();
>                               m_fromSess.rollback();
>                               System.out.println("Rollback completed. will 
> reconnect soon ...");
>                       }
>                       catch (JMSException re)
>                       {
>                               System.out.println("Rollback failed !!!");
>                               re.printStackTrace();
>                       }
>               }
>       }
>       
>       private void connect()
>       {
>               boolean isNotOK = true;
>               String target = null;
>               while (isNotOK) 
>               {
>                       try {
>                               if (m_fromConn != null) 
>                               {
>                                       m_fromConn.close();
>                                       m_fromConn = null;
>                               }
>                               
>                               if (m_toConn != null) 
>                               {
>                                       m_toConn.close();
>                                       m_toConn = null;
>                               }
>                               
>                               target = m_fromAMQF.getBrokerURL();
>                               m_fromConn = m_fromAMQF.createConnection();
>                               m_fromConn.start();
>                               m_fromSess = m_fromConn.createSession(true, 
> Session.AUTO_ACKNOWLEDGE);
>                               Destination fromDest = 
> m_fromSess.createQueue(m_queueName);
>                               m_msgConsumer = 
> m_fromSess.createConsumer(fromDest);
>                               
>                               target = m_toAMQF.getBrokerURL();
>                               m_toConn = m_toAMQF.createConnection();
>                               m_toConn.start();
>                               m_toSess = m_toConn.createSession(true, 
> Session.AUTO_ACKNOWLEDGE);
>                               Destination toDest = 
> m_toSess.createQueue(m_queueName);
>                               m_msgProducer = m_toSess.createProducer(toDest);
>                               isNotOK = false;
>                               System.out.println("Successful connection at " 
> + new Date());
>                       }
>                       catch(Exception e) {
>                               System.out.println("Failed to connect to " + 
> target + " due to: " + e.getMessage());
>                               try {
>                                       Thread.sleep(60000L);
>                               } catch (InterruptedException e1) {}
>                               
>                               System.out.println("About to retry connection 
> at " + new Date());
>                       }
>               }
>       }
>       
>       public void cleanup() throws Exception
>       {
>               m_cont = false;
>               
>               if (m_fromConn != null) 
>               {
>                       m_fromConn.close();
>                       m_fromConn = null;
>               }
>               
>               if (m_toConn != null) 
>               {
>                       m_toConn.close();
>                       m_toConn = null;
>               }
>       }
>       
>       public static void main(String[] args) throws Exception
>       {
>               BrokerService broker = 
> BrokerFactory.createBroker("xbean:activemq_gateway.xml", true);
>               broker.waitUntilStarted();
>               TransactionalStoreAndForward tsaf = new 
> TransactionalStoreAndForward("vm://AuditGW", "tcp://10.2.154.51:61616", 
> "AUDIT.EVENT");
>               Thread t = new Thread(tsaf);
>               t.start();
>               t.join();
>               tsaf.cleanup();
>               broker.stop();
>               broker.waitUntilStopped();
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to