Hi All,
We are working on an IoT project and using "org.apache.mina" library (Version -
2.0.16) in our gateway adaptor (GA - Java Code).
Device connects to our GA and GA send acknowledgement back to device for each
message.
We are using multi-threading for handling each packet from a device(s). It is
also possible to have multiple messages in a single packet from device. It
would be based on some delimiter.
E.g. Following packet is having 5 messages. Here delimiter is "Header"
HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5
Now Device is expecting 5 Acknowledgements one at a time like given below
ACK1
ACK2
ACK3
ACK4
ACK5
But we found following acknowledgements on device side. It is combining 2 or
more ACKs sometimes.
ACK1ACK2
ACK3
ACK4ACK5
Our code is given below. Acknowledgement code is highlighted. Kindly review and
let us know what are we doing wrong?
How does "session.write()" work? Will it not write immediately? I mean is it
using some buffer and appending multiple ACKs before sending message back to
device?
Kindly suggest the solution.
public abstract class AbstractListener {
protected static final int BUFFER_SIZE = 1024;
public void init(String portKey) throws IOException {
AbstractIoAcceptor acceptor = initInternal();
acceptor.setHandler(new RequestHandler());
acceptor.getFilterChain().addLast("logger", new
LoggingFilter());
acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
if (acceptor instanceof NioDatagramAcceptor) {
((NioDatagramAcceptor)
acceptor).getSessionConfig().setReuseAddress(true);
}
acceptor.bind(new
InetSocketAddress(Integer.parseInt(ServerConfigLoader.getInstance().getProperty(portKey))));
}
}
public class RequestHandler extends IoHandlerAdapter {
public RequestHandler() {
super();
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
LOGGER.error("Exception caught in
RequestHandler.exceptionCaught()", cause);
session.closeNow();
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
try {
SocketAddress remoteAddress =
session.getRemoteAddress();
String msg =
((IoBuffer)message).getHexDump().replaceAll(" ", "");
String strArray[] =
remoteAddress.toString().split(":");
ThreadAllocator.getInstance().allocateThread(session, msg);
} catch (Exception e) {
throw e;
}
}
}
public class ThreadAllocator {
/** The thread pool. */
private ExecutorService executorService = Executors
.newWorkStealingPool(Integer.parseInt(ServerConfigLoader.getInstance().getProperty("threadPoolSize")));
public void allocateThread(IoSession session, String message) {
executorService.execute(new
HelperThread(null,session, message));
}
}
public class HelperThread implements Runnable {
private String message;
private IoSession session;
private SocketChannel socketSession;
public HelperThread(SocketChannel socketSession, IoSession
session, String message) {
if (session != null) {
this.session = session;
this.session.getConfig().setBothIdleTime(1800);
} else if (socketSession != null) {
this.socketSession =
socketSession;
}
this.message = message;
}
public void run() {
// E.g. message =
"HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5"
List<String> multiMessage =
handleMultipleMessage(message);
for (String singleMessage : multiMessage) {
byte[] response =
AckBuilderFactory.getBuilderFactory(singleMessage)
.getBuilder(singleMessage).build(singleMessage);
if (response != null) {
IoBuffer
ioBuffer = IoBuffer.wrap(response);
session.write(ioBuffer);
}
}
}
}
Thanks & Regards,
Krishan Babbar
============================================================================================================================
Disclaimer: This message and the information contained herein is proprietary
and confidential and subject to the Tech Mahindra policy statement, you may
review the policy at http://www.techmahindra.com/Disclaimer.html externally
http://tim.techmahindra.com/tim/disclaimer.html internally within TechMahindra.
===========================================================================================================================
============================================================================================================================
Disclaimer: This message and the information contained herein is proprietary
and confidential and subject to the Tech Mahindra policy statement, you may
review the policy at http://www.techmahindra.com/Disclaimer.html
<http://www.techmahindra.com/Disclaimer.html> externally
http://tim.techmahindra.com/tim/disclaimer.html
<http://tim.techmahindra.com/tim/disclaimer.html> internally within
TechMahindra.
============================================================================================================================