[ https://issues.apache.org/jira/browse/DIRMINA-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417012#comment-17417012 ]
Zhang Hua commented on DIRMINA-1149: ------------------------------------ I tracked and debugged the source code, and found that the cause of the problem may be the ProtoColCoderfilter.filterWriter method {code:java} try { // Now we can try to encode the response encoder.encode(session, message, encoderOut); // Send it directly Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue(); // Write all the encoded messages now while (!bufferQueue.isEmpty()) { Object encodedMessage = bufferQueue.poll(); if (encodedMessage == null) { break; } // Flush only when the buffer has remaining. if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { SocketAddress destination = writeRequest.getDestination(); WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); nextFilter.filterWrite(session, encodedWriteRequest); } } // Call the next filter nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest)); } catch (Exception e) { {code} In a multi-threaded environment, step1: Thread A user=1, index=1, call encoder.encode(session, message, encoderOut); step2: Thread B executes Object encodedMessage = bufferQueue.poll(); to get (user=1, index=1) step3: Thread A bufferQueue.isEmpty()=true, next index step4: Thread A user=1, index=2, call encoder.encode(session, message, encoderOut); step5: Thread A executes Object encodedMessage = bufferQueue.poll(); to get (user=1, index=2) step6: Thread A executes nextFilter.filterWrite(session, encodedWriteRequest);// fire data (user=1,index=2) step7: Thread B executes nextFilter.filterWrite(session, encodedWriteRequest);// fire data (user=1,index=1) So the root cause is that ProtolColCoderFilter is not thread-safe I wrote a simple SyncProtolcolCoderFilter to avoid this problem {code:java} public class SyncProtocolCodecFilter extends ProtocolCodecFilter { public SyncProtocolCodecFilter(ProtocolCodecFactory factory) { super(factory); } @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { synchronized (session) { super.filterWrite(nextFilter, session, writeRequest); } } } {code} > IoSession.write under multi-thread enviroment, lose message order > ----------------------------------------------------------------- > > Key: DIRMINA-1149 > URL: https://issues.apache.org/jira/browse/DIRMINA-1149 > Project: MINA > Issue Type: Bug > Components: Core > Affects Versions: 2.0.21 > Environment: Java 1.8 > Windows 10 > Reporter: Zhang Hua > Priority: Minor > Attachments: minatest.zip > > > I am writing a stress-test that tests multi-thread safetyness of > IoSession.write method, and find lose message order. > My test method is as follows > 1. The client test code starts 50 threads, sharing the same IoSession object > 2. Each test thread simulates a user and sends data in sequence > I believe that the IoFilter I use meets the thread safety conditions > The result I expect is that the server receives the data of each user in an > orderly manner, but not > Synchronizing on the session.write makes the problem go away; > Do I really have to synchronize on the session to solve this issue? > > ClientDemo.java > {code:java} > public class ClientDemo { > public static void main(String[] args) throws Exception { > NioSocketConnector connector = new NioSocketConnector(); > DefaultIoFilterChainBuilder chain = connector.getFilterChain(); > chain.addLast("mdc", new MdcInjectionFilter()); > chain.addLast("codec", new ProtocolCodecFilter(new > MessagePackCodecFactory())); > TcpRPCHandler responseHandler = new TcpRPCHandler(); > connector.setHandler(responseHandler); > connector.setConnectTimeoutCheckInterval(30); > ConnectFuture cf = connector.connect(new > InetSocketAddress("127.0.0.1", 9999)); > IoSession session = cf.awaitUninterruptibly().getSession(); > ExecutorService executor = Executors.newFixedThreadPool(50); > for (int i = 0; i < 50; ++i) { > executor.execute(new SenderWorker(i, session)); > } > while (true) { > Thread.sleep(5000); > System.out.println("client alive......"); > // responseHandler.printProgress(); > } } > } > class SenderWorker implements Runnable { > private int userId; > private IoSession session; public SenderWorker(int userId, IoSession > session) { > this.userId = userId; > this.session = session; > } @Override > public void run() { > for (int i = 0; i < 100; ++i) { > MessageData data = new MessageData(userId, i); > /*synchronized (session)*/ { > session.write(data); > } > if (i % 5 == 0) { > try { > Thread.sleep(10); > } catch (Exception e) { > } > } > } > } > } > {code} > See the attachment for the complete code, I use maven to manage the project -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@mina.apache.org For additional commands, e-mail: dev-h...@mina.apache.org