[ 
https://issues.apache.org/jira/browse/DIRMINA-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417012#comment-17417012
 ] 

Zhang Hua commented on DIRMINA-1149:
------------------------------------

I tracked and debugged the source code, and found that the cause of the problem 
may be the ProtoColCoderfilter.filterWriter method
{code:java}
        try {
            // Now we can try to encode the response
            encoder.encode(session, message, encoderOut);            // Send it 
directly
            Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) 
encoderOut).getMessageQueue();            // Write all the encoded messages now
            while (!bufferQueue.isEmpty()) {
                Object encodedMessage = bufferQueue.poll();                if 
(encodedMessage == null) {
                    break;
                }                // Flush only when the buffer has remaining.
                if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) 
encodedMessage).hasRemaining()) {
                    SocketAddress destination = writeRequest.getDestination();
                    WriteRequest encodedWriteRequest = new 
EncodedWriteRequest(encodedMessage, null, destination);                    
nextFilter.filterWrite(session, encodedWriteRequest);
                }
            }            // Call the next filter
            nextFilter.filterWrite(session, new 
MessageWriteRequest(writeRequest));
        } catch (Exception e) {

{code}
In a multi-threaded environment,

step1: Thread A user=1, index=1, call encoder.encode(session, message, 
encoderOut);

step2: Thread B executes Object encodedMessage = bufferQueue.poll(); to get 
(user=1, index=1)

step3: Thread A bufferQueue.isEmpty()=true, next index

step4: Thread A user=1, index=2, call encoder.encode(session, message, 
encoderOut);

step5: Thread A executes Object encodedMessage = bufferQueue.poll(); to get 
(user=1, index=2)

step6: Thread A executes nextFilter.filterWrite(session, 
encodedWriteRequest);// fire data (user=1,index=2)

step7: Thread B executes nextFilter.filterWrite(session, 
encodedWriteRequest);// fire data (user=1,index=1)

 

So the root cause is that ProtolColCoderFilter is not thread-safe
I wrote a simple SyncProtolcolCoderFilter to avoid this problem

 
{code:java}
public class SyncProtocolCodecFilter extends ProtocolCodecFilter {
    public SyncProtocolCodecFilter(ProtocolCodecFactory factory) {
        super(factory);
    }    @Override
    public void filterWrite(NextFilter nextFilter, IoSession session, 
WriteRequest writeRequest) throws Exception {
        synchronized (session) {
            super.filterWrite(nextFilter, session, writeRequest);
        }
    }
}

{code}

> IoSession.write under multi-thread enviroment, lose message order
> -----------------------------------------------------------------
>
>                 Key: DIRMINA-1149
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-1149
>             Project: MINA
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 2.0.21
>         Environment: Java 1.8
> Windows 10
>            Reporter: Zhang Hua
>            Priority: Minor
>         Attachments: minatest.zip
>
>
> I am writing a stress-test that tests multi-thread safetyness of 
> IoSession.write method, and find lose message order. 
> My test method is as follows
> 1. The client test code starts 50 threads, sharing the same IoSession object
> 2. Each test thread simulates a user and sends data in sequence
> I believe that the IoFilter I use meets the thread safety conditions
> The result I expect is that the server receives the data of each user in an 
> orderly manner, but not
> Synchronizing on the session.write makes the problem go away;
> Do I really have to synchronize on the session to solve this issue?
>  
> ClientDemo.java
> {code:java}
> public class ClientDemo {    
> public static void main(String[] args) throws Exception {
>         NioSocketConnector connector = new NioSocketConnector();
>         DefaultIoFilterChainBuilder chain = connector.getFilterChain();
>         chain.addLast("mdc", new MdcInjectionFilter());
>         chain.addLast("codec", new ProtocolCodecFilter(new 
> MessagePackCodecFactory()));
>         TcpRPCHandler responseHandler = new TcpRPCHandler();
>         connector.setHandler(responseHandler);
>         connector.setConnectTimeoutCheckInterval(30);
>         ConnectFuture cf = connector.connect(new 
> InetSocketAddress("127.0.0.1", 9999));
>         IoSession session = cf.awaitUninterruptibly().getSession();        
> ExecutorService executor = Executors.newFixedThreadPool(50);
>         for (int i = 0; i < 50; ++i) {
>             executor.execute(new SenderWorker(i, session));
>         }
>         while (true) {
>             Thread.sleep(5000);
>             System.out.println("client alive......");
>             //            responseHandler.printProgress();
>         }    }
> }
> class SenderWorker implements Runnable {
>     private int userId;
>     private IoSession session;    public SenderWorker(int userId, IoSession 
> session) {
>         this.userId = userId;
>         this.session = session;
>     }    @Override
>     public void run() {
>         for (int i = 0; i < 100; ++i) {
>             MessageData data = new MessageData(userId, i);
>             /*synchronized (session)*/ {
>                 session.write(data);
>             }
>             if (i % 5 == 0) {
>                 try {
>                     Thread.sleep(10);
>                 } catch (Exception e) {
>                 }
>             }
>         }
>     }
> }
> {code}
> See the attachment for the complete code, I use maven to manage the project



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@mina.apache.org
For additional commands, e-mail: dev-h...@mina.apache.org

Reply via email to