[ https://issues.apache.org/jira/browse/AMQ-5537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gary Tully updated AMQ-5537: ---------------------------- Fix Version/s: (was: 5.11.0) > Network Connector Throughput > ---------------------------- > > Key: AMQ-5537 > URL: https://issues.apache.org/jira/browse/AMQ-5537 > Project: ActiveMQ > Issue Type: Improvement > Components: Connector > Affects Versions: 5.x > Environment: Network of Brokers. Platform agnostic. Local Broker has > a networkConnector defined to forward all messages to a remote broker. > Reporter: Ehud Eshet > > *Requirement* > 1. Allow network connector to use transactions when forwarding persistent > messages. > 2. Provide the following new network connector properties: > maxMessagesPerTransaction - when specified and great than 1, use transactions. > maxTransactionLatencyMillis - commit immediately when time passed since last > commit is more than specified. > Let's say both parameters are set as 1000. > Network connector should commit after every 1000 messages or when more than > 1000ms passed since last commit (the sooner). > *Background* > Persistent messages throughput is significantly slower. > When using transactions and committing every 1000 messages, throughput on > local broker with levelDB is about 12,000 messages of 1KB per second. > Network connector does not use transactions. Thus, its throughput is limited > to few hundreds messages per second. > When imitating network connector functionality (receive from local broker and > send to remote broker) using transactions on both sessions, I managed to have > a sustained throughput of 10,000 messages/sec stored on local broker plus up > to 11,000 messages/s forwarded to remote broker (forwarding throughput must > be higher to allow catch up after reconnect). > *Sample code* > {code:title=TransactionalStoreAndForward.java|borderStyle=solid} > import java.util.Date; > import javax.jms.*; > import javax.jms.Connection; > import javax.jms.Message; > import org.apache.activemq.*; > import org.apache.activemq.broker.*; > public class TransactionalStoreAndForward implements Runnable > { > private final String m_queueName; > private final ActiveMQConnectionFactory m_fromAMQF, m_toAMQF; > > private Connection m_fromConn = null, m_toConn = null; > private Session m_fromSess = null, m_toSess = null; > private MessageConsumer m_msgConsumer = null; > private MessageProducer m_msgProducer = null; > > private boolean m_cont = true; > > public static final int MAX_MESSAGES_PER_TRANSACTION = 500; > public static final long MAX_TRANSACTION_LATENCY_MILLIS = 5000L; > > public TransactionalStoreAndForward(String fromUri, String toUri, > String queueName) > { > m_fromAMQF = new ActiveMQConnectionFactory(fromUri); > m_toAMQF = new ActiveMQConnectionFactory(toUri); > m_queueName = queueName; > } > > @Override > public void run() > { > while (m_cont) > { > connect(); > process(); > } > } > > private void process() > { > long txMessages = 0, totalMessages = 0, lastPrintMessages = 0; > long startTime = 0L; > long lastTxTime = startTime, lastPrintTime = startTime; > > Message msg = null; > > try { > while (m_cont) > { > while ((msg = > m_msgConsumer.receive(MAX_TRANSACTION_LATENCY_MILLIS)) != null) > { > if (startTime == 0) { > startTime = > System.currentTimeMillis(); > lastTxTime = startTime; > lastPrintTime = startTime; > } > > m_msgProducer.send(msg); > txMessages++; > totalMessages++; > > if (txMessages == > MAX_MESSAGES_PER_TRANSACTION || > > System.currentTimeMillis() - lastTxTime > MAX_TRANSACTION_LATENCY_MILLIS) > { > m_toSess.commit(); > m_fromSess.commit(); > lastTxTime = > System.currentTimeMillis(); > txMessages = 0; > } > > if (System.currentTimeMillis() - > lastPrintTime > 10000L) { > System.out.println("processed " > + (totalMessages - lastPrintMessages) + " messages during last 10 seconds. > Avg. messages/s: " + (totalMessages * 1000L / (System.currentTimeMillis() - > startTime)) + " at " + new Date()); > lastPrintTime = > System.currentTimeMillis(); > lastPrintMessages = > totalMessages; > } > } > > if (txMessages > 0) > { > m_toSess.commit(); > m_fromSess.commit(); > lastTxTime = System.currentTimeMillis(); > txMessages = 0; > } > else { > System.out.println("Idle for more than > a minute at " + new Date()); > } > } > } > catch(JMSException jmse) > { > System.out.println("About to rollback " + txMessages + > " messages due to: " + jmse.getMessage()); > try { > m_toSess.rollback(); > m_fromSess.rollback(); > System.out.println("Rollback completed. will > reconnect soon ..."); > } > catch (JMSException re) > { > System.out.println("Rollback failed !!!"); > re.printStackTrace(); > } > } > } > > private void connect() > { > boolean isNotOK = true; > String target = null; > while (isNotOK) > { > try { > if (m_fromConn != null) > { > m_fromConn.close(); > m_fromConn = null; > } > > if (m_toConn != null) > { > m_toConn.close(); > m_toConn = null; > } > > target = m_fromAMQF.getBrokerURL(); > m_fromConn = m_fromAMQF.createConnection(); > m_fromConn.start(); > m_fromSess = m_fromConn.createSession(true, > Session.AUTO_ACKNOWLEDGE); > Destination fromDest = > m_fromSess.createQueue(m_queueName); > m_msgConsumer = > m_fromSess.createConsumer(fromDest); > > target = m_toAMQF.getBrokerURL(); > m_toConn = m_toAMQF.createConnection(); > m_toConn.start(); > m_toSess = m_toConn.createSession(true, > Session.AUTO_ACKNOWLEDGE); > Destination toDest = > m_toSess.createQueue(m_queueName); > m_msgProducer = m_toSess.createProducer(toDest); > isNotOK = false; > System.out.println("Successful connection at " > + new Date()); > } > catch(Exception e) { > System.out.println("Failed to connect to " + > target + " due to: " + e.getMessage()); > try { > Thread.sleep(60000L); > } catch (InterruptedException e1) {} > > System.out.println("About to retry connection > at " + new Date()); > } > } > } > > public void cleanup() throws Exception > { > m_cont = false; > > if (m_fromConn != null) > { > m_fromConn.close(); > m_fromConn = null; > } > > if (m_toConn != null) > { > m_toConn.close(); > m_toConn = null; > } > } > > public static void main(String[] args) throws Exception > { > BrokerService broker = > BrokerFactory.createBroker("xbean:activemq_gateway.xml", true); > broker.waitUntilStarted(); > TransactionalStoreAndForward tsaf = new > TransactionalStoreAndForward("vm://AuditGW", "tcp://10.2.154.51:61616", > "AUDIT.EVENT"); > Thread t = new Thread(tsaf); > t.start(); > t.join(); > tsaf.cleanup(); > broker.stop(); > broker.waitUntilStopped(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)