There should be no aggregation, especially for Datagrams. On Tue, Sep 18, 2018 at 1:57 PM Raghavendra Balgi <[email protected]> wrote:
> Hey Krishan, You should probably consider using a CodecFilter as described > here - > > https://mina.apache.org/mina-project/userguide/ch9-codec-filter/ch9-codec-filter.html > > On Tue, Sep 18, 2018 at 11:11 PM Krishan Babbar < > [email protected]> > wrote: > > > Hi All, > > > > We are working on an IoT project and using "org.apache.mina" library > > (Version - 2.0.16) in our gateway adaptor (GA - Java Code). > > Device connects to our GA and GA send acknowledgement back to device for > > each message. > > We are using multi-threading for handling each packet from a device(s). > It > > is also possible to have multiple messages in a single packet from > device. > > It would be based on some delimiter. > > > > E.g. Following packet is having 5 messages. Here delimiter is "Header" > > HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5 > > > > Now Device is expecting 5 Acknowledgements one at a time like given below > > ACK1 > > ACK2 > > ACK3 > > ACK4 > > ACK5 > > > > But we found following acknowledgements on device side. It is combining 2 > > or more ACKs sometimes. > > ACK1ACK2 > > ACK3 > > ACK4ACK5 > > > > Our code is given below. Acknowledgement code is highlighted. Kindly > > review and let us know what are we doing wrong? > > How does "session.write()" work? Will it not write immediately? I mean is > > it using some buffer and appending multiple ACKs before sending message > > back to device? > > > > Kindly suggest the solution. > > > > > > public abstract class AbstractListener { > > protected static final int BUFFER_SIZE = 1024; > > public void init(String portKey) throws IOException { > > AbstractIoAcceptor acceptor = > > initInternal(); > > acceptor.setHandler(new > RequestHandler()); > > > > acceptor.getFilterChain().addLast("logger", new LoggingFilter()); > > > > acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE); > > > > acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); > > if (acceptor instanceof > > NioDatagramAcceptor) { > > ((NioDatagramAcceptor) > > acceptor).getSessionConfig().setReuseAddress(true); > > } > > acceptor.bind(new > > > InetSocketAddress(Integer.parseInt(ServerConfigLoader.getInstance().getProperty(portKey)))); > > } > > } > > > > public class RequestHandler extends IoHandlerAdapter { > > public RequestHandler() { > > super(); > > } > > > > @Override > > public void exceptionCaught(IoSession session, Throwable > > cause) throws Exception { > > LOGGER.error("Exception caught in > > RequestHandler.exceptionCaught()", cause); > > session.closeNow(); > > } > > > > @Override > > public void messageReceived(IoSession session, Object > > message) throws Exception { > > try { > > SocketAddress > > remoteAddress = session.getRemoteAddress(); > > String msg = > > ((IoBuffer)message).getHexDump().replaceAll(" ", ""); > > String strArray[] = > > remoteAddress.toString().split(":"); > > > > ThreadAllocator.getInstance().allocateThread(session, msg); > > } catch (Exception e) { > > throw e; > > } > > } > > } > > > > public class ThreadAllocator { > > > > /** The thread pool. */ > > private ExecutorService executorService = Executors > > > > > .newWorkStealingPool(Integer.parseInt(ServerConfigLoader.getInstance().getProperty("threadPoolSize"))); > > > > public void allocateThread(IoSession session, String > > message) { > > executorService.execute(new > > HelperThread(null,session, message)); > > } > > } > > > > > > public class HelperThread implements Runnable { > > private String message; > > private IoSession session; > > private SocketChannel socketSession; > > > > public HelperThread(SocketChannel socketSession, > IoSession > > session, String message) { > > if (session != null) { > > this.session = session; > > > > this.session.getConfig().setBothIdleTime(1800); > > } else if (socketSession != null) { > > this.socketSession = > > socketSession; > > } > > this.message = message; > > } > > > > public void run() { > > // E.g. message = > > "HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5" > > > > List<String> multiMessage = > > handleMultipleMessage(message); > > for (String singleMessage : > multiMessage) { > > byte[] response = > > AckBuilderFactory.getBuilderFactory(singleMessage) > > > > .getBuilder(singleMessage).build(singleMessage); > > if (response != null) { > > IoBuffer > > ioBuffer = IoBuffer.wrap(response); > > > > session.write(ioBuffer); > > } > > } > > } > > } > > > > Thanks & Regards, > > Krishan Babbar > > > > > > > > > ============================================================================================================================ > > Disclaimer: This message and the information contained herein is > > proprietary and confidential and subject to the Tech Mahindra policy > > statement, you may review the policy at > > http://www.techmahindra.com/Disclaimer.html externally > > http://tim.techmahindra.com/tim/disclaimer.html internally within > > TechMahindra. > > > > > =========================================================================================================================== > > > > > > > ============================================================================================================================ > > > > Disclaimer: This message and the information contained herein is > > proprietary and confidential and subject to the Tech Mahindra policy > > statement, you may review the policy at > > http://www.techmahindra.com/Disclaimer.html < > > http://www.techmahindra.com/Disclaimer.html> externally > > http://tim.techmahindra.com/tim/disclaimer.html < > > http://tim.techmahindra.com/tim/disclaimer.html> internally within > > TechMahindra. > > > > > > > ============================================================================================================================ > > > > > -- > Raghavendra Balgi > Bangalore > Phone: +91 7795816719 > Email : [email protected] <http://[email protected]/[email protected]> >
