Here is a sample implementation of my code:
Class MinaTCPClient: This class is a singleton - only one instance is used.
public class MinaTCPClient {
public static void process(final MinaEndpoint minaEndpoint, int size)
{
try
{
Thread[] clientThreads = new Thread[size];
int index = 0;
for (Thread clientThread: clientThreads)
{
clientThread = new Thread(String.valueOf(index))
{
public void run()
{
try
{
String isoXml =
"<request><echodata>1234</echodata></request>";
String response =
String.valueOf(minaEndpoint.process(isoXml));
LOG.debug("Response: " + response);
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
};
clientThread.start();
}
for (Thread clientThread : clientThreads) {
clientThread.join();
}
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
}
Class MinaEndpoint:
public class MinaEndpoint()
{
private IoSession session;
private CountDownLatch latch;
private long timeout = 55000;
private SocketAddress address;
private IoConnector connector;
private boolean sync;
private IoSessionConfig connectorConfig;
private ExecutorService workerPool;
public MinaEndpoint(String host, int port)
{
address = new InetSocketAddress(host, port);
final int processorCount =
Runtime.getRuntime().availableProcessors() + 1;
connector = new NioSocketConnector(processorCount);
connectorConfig = connector.getSessionConfig();
workerPool = new UnorderedThreadPoolExecutor(16);
connector.getFilterChain().addLast("threadPool", new
ExecutorFilter(workerPool));
connector.getFilterChain().addLast("logger", new
LoggingFilter());
connector.setConnectTimeoutMillis(timeout);
connector.getFilterChain().addLast("codec", new
ProtocolCodecFilter(new
Mina2ISO8583XMLCodecFactory()));
}
public Object process(Object body) throws Exception
{
connector.getSessionConfig().setAll(connectorConfig);
connector.setHandler(new ResponseHandler());
ConnectFuture future = connector.connect(address);
future.awaitUninterruptibly();
session = future.getSession();
latch = new CountDownLatch(1);
ResponseHandler handler = (ResponseHandler) session.getHandler();
handler.reset();
session.write(body);
boolean done = latch.await(timeout, TimeUnit.MILLISECONDS);
if(!done)
throw new TimeoutException("Timeout waiting for response for " +
body);
handler = (ResponseHandler) session.getHandler();
Object response = handler.getMessage();
return response;
}
private final class ResponseHandler extends IoHandlerAdapter {
private Object message;
private Throwable cause;
private boolean messageReceived;
public void reset() {
this.message = null;
this.cause = null;
this.messageReceived = false;
}
@Override
public void messageReceived(IoSession ioSession, Object message)
throws Exception {
this.message = message;
messageReceived = true;
cause = null;
countDown();
}
protected void countDown() {
CountDownLatch downLatch = latch;
if (downLatch != null) {
downLatch.countDown();
}
}
@Override
public void sessionClosed(IoSession session) throws Exception {
if (sync && !messageReceived) {
countDown();
}
}
@Override
public void exceptionCaught(IoSession ioSession, Throwable cause) {
this.message = null;
this.messageReceived = false;
this.cause = cause;
if (ioSession != null) {
ioSession.close(true);
}
}
public Throwable getCause() {
return this.cause;
}
public Object getMessage() {
return this.message;
}
public boolean isMessageReceived() {
return messageReceived;
}
}
}
NB: CODEC classes arent included
--
View this message in context:
http://apache-mina.10907.n7.nabble.com/Understanding-Apache-Mina-2-Request-Response-mapping-tp42337p42366.html
Sent from the Apache MINA User Forum mailing list archive at Nabble.com.