Hi All,

We are working on an IoT project and using "org.apache.mina" library (Version - 
2.0.16) in our gateway adaptor (GA - Java Code).
Device connects to our GA and GA send acknowledgement back to device for each 
message.
We are using multi-threading for handling each packet from a device(s). It is 
also possible to have multiple messages in a single packet from device. It 
would be based on some delimiter.

E.g. Following packet is having 5 messages. Here delimiter is "Header"
HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5

Now Device is expecting 5 Acknowledgements one at a time like given below
ACK1
ACK2
ACK3
ACK4
ACK5

But we found following acknowledgements on device side. It is combining 2 or 
more ACKs sometimes.
ACK1ACK2
ACK3
ACK4ACK5

Our code is given below. Acknowledgement code is highlighted. Kindly review and 
let us know what are we doing wrong?
How does "session.write()" work? Will it not write immediately? I mean is it 
using some buffer and appending multiple ACKs before sending message back to 
device?

Kindly suggest the solution.


public abstract class AbstractListener {
                protected static final int BUFFER_SIZE = 1024;
                public void init(String portKey) throws IOException {
                                AbstractIoAcceptor acceptor = initInternal();
                                acceptor.setHandler(new RequestHandler());
                                acceptor.getFilterChain().addLast("logger", new 
LoggingFilter());
                                
acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE);
                                
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
                                if (acceptor instanceof NioDatagramAcceptor) {
                                                ((NioDatagramAcceptor) 
acceptor).getSessionConfig().setReuseAddress(true);
                                }
                                acceptor.bind(new 
InetSocketAddress(Integer.parseInt(ServerConfigLoader.getInstance().getProperty(portKey))));
}
}

public class RequestHandler extends IoHandlerAdapter {
                public RequestHandler() {
                                super();
                }

                @Override
                public void exceptionCaught(IoSession session, Throwable cause) 
throws Exception {
                                LOGGER.error("Exception caught in 
RequestHandler.exceptionCaught()", cause);
                                session.closeNow();
                }

                @Override
                public void messageReceived(IoSession session, Object message) 
throws Exception {
                                try {
                                                SocketAddress remoteAddress = 
session.getRemoteAddress();
                                                String msg = 
((IoBuffer)message).getHexDump().replaceAll(" ", "");
                                                String strArray[] = 
remoteAddress.toString().split(":");
                                                
ThreadAllocator.getInstance().allocateThread(session, msg);
                                } catch (Exception e) {
                                                throw e;
                                }
                }
}

public class ThreadAllocator {

                /** The thread pool. */
                private ExecutorService executorService = Executors
                                                
.newWorkStealingPool(Integer.parseInt(ServerConfigLoader.getInstance().getProperty("threadPoolSize")));

                public void allocateThread(IoSession session, String message) {
                                executorService.execute(new 
HelperThread(null,session, message));
                }
}


public class HelperThread implements Runnable {
private String message;
                private IoSession session;
                private SocketChannel socketSession;

                public HelperThread(SocketChannel socketSession, IoSession 
session, String message) {
                                if (session != null) {
                                                this.session = session;
                                                
this.session.getConfig().setBothIdleTime(1800);
                                } else if (socketSession != null) {
                                                this.socketSession = 
socketSession;
                                }
                                this.message = message;
                }

                public void run() {
                                // E.g. message = 
"HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5"

                                List<String> multiMessage = 
handleMultipleMessage(message);
                                for (String singleMessage : multiMessage) {
                                                byte[] response = 
AckBuilderFactory.getBuilderFactory(singleMessage)
                                                                                
.getBuilder(singleMessage).build(singleMessage);
                                                if (response != null) {
                                                                IoBuffer 
ioBuffer = IoBuffer.wrap(response);
                                                                
session.write(ioBuffer);
                                                }
                                }
                }
}

Thanks & Regards,
Krishan Babbar


============================================================================================================================
Disclaimer: This message and the information contained herein is proprietary 
and confidential and subject to the Tech Mahindra policy statement, you may 
review the policy at http://www.techmahindra.com/Disclaimer.html externally 
http://tim.techmahindra.com/tim/disclaimer.html internally within TechMahindra.
===========================================================================================================================

============================================================================================================================

Disclaimer:  This message and the information contained herein is proprietary 
and confidential and subject to the Tech Mahindra policy statement, you may 
review the policy at http://www.techmahindra.com/Disclaimer.html 
<http://www.techmahindra.com/Disclaimer.html> externally 
http://tim.techmahindra.com/tim/disclaimer.html 
<http://tim.techmahindra.com/tim/disclaimer.html> internally within 
TechMahindra.

============================================================================================================================

Reply via email to