There should be no aggregation, especially for Datagrams.

On Tue, Sep 18, 2018 at 1:57 PM Raghavendra Balgi <[email protected]> wrote:

> Hey Krishan, You should probably consider using a CodecFilter as described
> here -
>
> https://mina.apache.org/mina-project/userguide/ch9-codec-filter/ch9-codec-filter.html
>
> On Tue, Sep 18, 2018 at 11:11 PM Krishan Babbar <
> [email protected]>
> wrote:
>
> > Hi All,
> >
> > We are working on an IoT project and using "org.apache.mina" library
> > (Version - 2.0.16) in our gateway adaptor (GA - Java Code).
> > Device connects to our GA and GA send acknowledgement back to device for
> > each message.
> > We are using multi-threading for handling each packet from a device(s).
> It
> > is also possible to have multiple messages in a single packet from
> device.
> > It would be based on some delimiter.
> >
> > E.g. Following packet is having 5 messages. Here delimiter is "Header"
> > HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5
> >
> > Now Device is expecting 5 Acknowledgements one at a time like given below
> > ACK1
> > ACK2
> > ACK3
> > ACK4
> > ACK5
> >
> > But we found following acknowledgements on device side. It is combining 2
> > or more ACKs sometimes.
> > ACK1ACK2
> > ACK3
> > ACK4ACK5
> >
> > Our code is given below. Acknowledgement code is highlighted. Kindly
> > review and let us know what are we doing wrong?
> > How does "session.write()" work? Will it not write immediately? I mean is
> > it using some buffer and appending multiple ACKs before sending message
> > back to device?
> >
> > Kindly suggest the solution.
> >
> >
> > public abstract class AbstractListener {
> >                 protected static final int BUFFER_SIZE = 1024;
> >                 public void init(String portKey) throws IOException {
> >                                 AbstractIoAcceptor acceptor =
> > initInternal();
> >                                 acceptor.setHandler(new
> RequestHandler());
> >
> > acceptor.getFilterChain().addLast("logger", new LoggingFilter());
> >
> > acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE);
> >
> > acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
> >                                 if (acceptor instanceof
> > NioDatagramAcceptor) {
> >                                                 ((NioDatagramAcceptor)
> > acceptor).getSessionConfig().setReuseAddress(true);
> >                                 }
> >                                 acceptor.bind(new
> >
> InetSocketAddress(Integer.parseInt(ServerConfigLoader.getInstance().getProperty(portKey))));
> > }
> > }
> >
> > public class RequestHandler extends IoHandlerAdapter {
> >                 public RequestHandler() {
> >                                 super();
> >                 }
> >
> >                 @Override
> >                 public void exceptionCaught(IoSession session, Throwable
> > cause) throws Exception {
> >                                 LOGGER.error("Exception caught in
> > RequestHandler.exceptionCaught()", cause);
> >                                 session.closeNow();
> >                 }
> >
> >                 @Override
> >                 public void messageReceived(IoSession session, Object
> > message) throws Exception {
> >                                 try {
> >                                                 SocketAddress
> > remoteAddress = session.getRemoteAddress();
> >                                                 String msg =
> > ((IoBuffer)message).getHexDump().replaceAll(" ", "");
> >                                                 String strArray[] =
> > remoteAddress.toString().split(":");
> >
> > ThreadAllocator.getInstance().allocateThread(session, msg);
> >                                 } catch (Exception e) {
> >                                                 throw e;
> >                                 }
> >                 }
> > }
> >
> > public class ThreadAllocator {
> >
> >                 /** The thread pool. */
> >                 private ExecutorService executorService = Executors
> >
> >
> .newWorkStealingPool(Integer.parseInt(ServerConfigLoader.getInstance().getProperty("threadPoolSize")));
> >
> >                 public void allocateThread(IoSession session, String
> > message) {
> >                                 executorService.execute(new
> > HelperThread(null,session, message));
> >                 }
> > }
> >
> >
> > public class HelperThread implements Runnable {
> > private String message;
> >                 private IoSession session;
> >                 private SocketChannel socketSession;
> >
> >                 public HelperThread(SocketChannel socketSession,
> IoSession
> > session, String message) {
> >                                 if (session != null) {
> >                                                 this.session = session;
> >
> > this.session.getConfig().setBothIdleTime(1800);
> >                                 } else if (socketSession != null) {
> >                                                 this.socketSession =
> > socketSession;
> >                                 }
> >                                 this.message = message;
> >                 }
> >
> >                 public void run() {
> >                                 // E.g. message =
> > "HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5"
> >
> >                                 List<String> multiMessage =
> > handleMultipleMessage(message);
> >                                 for (String singleMessage :
> multiMessage) {
> >                                                 byte[] response =
> > AckBuilderFactory.getBuilderFactory(singleMessage)
> >
> >       .getBuilder(singleMessage).build(singleMessage);
> >                                                 if (response != null) {
> >                                                                 IoBuffer
> > ioBuffer = IoBuffer.wrap(response);
> >
> > session.write(ioBuffer);
> >                                                 }
> >                                 }
> >                 }
> > }
> >
> > Thanks & Regards,
> > Krishan Babbar
> >
> >
> >
> >
> ============================================================================================================================
> > Disclaimer: This message and the information contained herein is
> > proprietary and confidential and subject to the Tech Mahindra policy
> > statement, you may review the policy at
> > http://www.techmahindra.com/Disclaimer.html externally
> > http://tim.techmahindra.com/tim/disclaimer.html internally within
> > TechMahindra.
> >
> >
> ===========================================================================================================================
> >
> >
> >
> ============================================================================================================================
> >
> > Disclaimer:  This message and the information contained herein is
> > proprietary and confidential and subject to the Tech Mahindra policy
> > statement, you may review the policy at
> > http://www.techmahindra.com/Disclaimer.html <
> > http://www.techmahindra.com/Disclaimer.html> externally
> > http://tim.techmahindra.com/tim/disclaimer.html <
> > http://tim.techmahindra.com/tim/disclaimer.html> internally within
> > TechMahindra.
> >
> >
> >
> ============================================================================================================================
> >
>
>
> --
> Raghavendra Balgi
> Bangalore
> Phone: +91 7795816719
> Email : [email protected] <http://[email protected]/[email protected]>
>

Reply via email to