Ehud Eshet created AMQ-5537:
-------------------------------
Summary: Network Connector Throughput
Key: AMQ-5537
URL: https://issues.apache.org/jira/browse/AMQ-5537
Project: ActiveMQ
Issue Type: Improvement
Components: Connector
Affects Versions: 5.x
Environment: Network of Brokers. Platform agnostic. Local Broker has a
networkConnector defined to forward all messages to a remote broker.
Reporter: Ehud Eshet
Fix For: 5.11.0
*Requirement*
1. Allow network connector to use transactions when forwarding persistent
messages.
2. Provide the following new network connector properties:
maxMessagesPerTransaction - when specified and great than 1, use transactions.
maxTransactionLatencyMillis - commit immediately when time passed since last
commit is more than specified.
Let's say both parameters are set as 1000.
Network connector should commit after every 1000 messages or when more than
1000ms passed since last commit (the sooner).
*Background*
Persistent messages throughput is significantly slower.
When using transactions and committing every 1000 messages, throughput on local
broker with levelDB is about 12,000 messages of 1KB per second.
Network connector does not use transactions. Thus, its throughput is limited to
few hundreds messages per second.
When imitating network connector functionality (receive from local broker and
send to remote broker) using transactions on both sessions, I managed to have a
sustained throughput of 10,000 messages/sec stored on local broker plus up to
11,000 messages/s forwarded to remote broker (forwarding throughput must be
higher to allow catch up after reconnect).
*Sample code*
{code:title=TransactionalStoreAndForward.java|borderStyle=solid}
import java.util.Date;
import javax.jms.*;
import javax.jms.Connection;
import javax.jms.Message;
import org.apache.activemq.*;
import org.apache.activemq.broker.*;
public class TransactionalStoreAndForward implements Runnable
{
private final String m_queueName;
private final ActiveMQConnectionFactory m_fromAMQF, m_toAMQF;
private Connection m_fromConn = null, m_toConn = null;
private Session m_fromSess = null, m_toSess = null;
private MessageConsumer m_msgConsumer = null;
private MessageProducer m_msgProducer = null;
private boolean m_cont = true;
public static final int MAX_MESSAGES_PER_TRANSACTION = 500;
public static final long MAX_TRANSACTION_LATENCY_MILLIS = 5000L;
public TransactionalStoreAndForward(String fromUri, String toUri,
String queueName)
{
m_fromAMQF = new ActiveMQConnectionFactory(fromUri);
m_toAMQF = new ActiveMQConnectionFactory(toUri);
m_queueName = queueName;
}
@Override
public void run()
{
while (m_cont)
{
connect();
process();
}
}
private void process()
{
long txMessages = 0, totalMessages = 0, lastPrintMessages = 0;
long startTime = 0L;
long lastTxTime = startTime, lastPrintTime = startTime;
Message msg = null;
try {
while (m_cont)
{
while ((msg =
m_msgConsumer.receive(MAX_TRANSACTION_LATENCY_MILLIS)) != null)
{
if (startTime == 0) {
startTime =
System.currentTimeMillis();
lastTxTime = startTime;
lastPrintTime = startTime;
}
m_msgProducer.send(msg);
txMessages++;
totalMessages++;
if (txMessages ==
MAX_MESSAGES_PER_TRANSACTION ||
System.currentTimeMillis() - lastTxTime > MAX_TRANSACTION_LATENCY_MILLIS)
{
m_toSess.commit();
m_fromSess.commit();
lastTxTime =
System.currentTimeMillis();
txMessages = 0;
}
if (System.currentTimeMillis() -
lastPrintTime > 10000L) {
System.out.println("processed "
+ (totalMessages - lastPrintMessages) + " messages during last 10 seconds. Avg.
messages/s: " + (totalMessages * 1000L / (System.currentTimeMillis() -
startTime)) + " at " + new Date());
lastPrintTime =
System.currentTimeMillis();
lastPrintMessages =
totalMessages;
}
}
if (txMessages > 0)
{
m_toSess.commit();
m_fromSess.commit();
lastTxTime = System.currentTimeMillis();
txMessages = 0;
}
else {
System.out.println("Idle for more than
a minute at " + new Date());
}
}
}
catch(JMSException jmse)
{
System.out.println("About to rollback " + txMessages +
" messages due to: " + jmse.getMessage());
try {
m_toSess.rollback();
m_fromSess.rollback();
System.out.println("Rollback completed. will
reconnect soon ...");
}
catch (JMSException re)
{
System.out.println("Rollback failed !!!");
re.printStackTrace();
}
}
}
private void connect()
{
boolean isNotOK = true;
String target = null;
while (isNotOK)
{
try {
if (m_fromConn != null)
{
m_fromConn.close();
m_fromConn = null;
}
if (m_toConn != null)
{
m_toConn.close();
m_toConn = null;
}
target = m_fromAMQF.getBrokerURL();
m_fromConn = m_fromAMQF.createConnection();
m_fromConn.start();
m_fromSess = m_fromConn.createSession(true,
Session.AUTO_ACKNOWLEDGE);
Destination fromDest =
m_fromSess.createQueue(m_queueName);
m_msgConsumer =
m_fromSess.createConsumer(fromDest);
target = m_toAMQF.getBrokerURL();
m_toConn = m_toAMQF.createConnection();
m_toConn.start();
m_toSess = m_toConn.createSession(true,
Session.AUTO_ACKNOWLEDGE);
Destination toDest =
m_toSess.createQueue(m_queueName);
m_msgProducer = m_toSess.createProducer(toDest);
isNotOK = false;
System.out.println("Successful connection at "
+ new Date());
}
catch(Exception e) {
System.out.println("Failed to connect to " +
target + " due to: " + e.getMessage());
try {
Thread.sleep(60000L);
} catch (InterruptedException e1) {}
System.out.println("About to retry connection
at " + new Date());
}
}
}
public void cleanup() throws Exception
{
m_cont = false;
if (m_fromConn != null)
{
m_fromConn.close();
m_fromConn = null;
}
if (m_toConn != null)
{
m_toConn.close();
m_toConn = null;
}
}
public static void main(String[] args) throws Exception
{
BrokerService broker =
BrokerFactory.createBroker("xbean:activemq_gateway.xml", true);
broker.waitUntilStarted();
TransactionalStoreAndForward tsaf = new
TransactionalStoreAndForward("vm://AuditGW", "tcp://10.2.154.51:61616",
"AUDIT.EVENT");
Thread t = new Thread(tsaf);
t.start();
t.join();
tsaf.cleanup();
broker.stop();
broker.waitUntilStopped();
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)