wenqi.huang created ROCKETMQ-267:
------------------------------------

             Summary: server may reject messages when pdflush write dirty data 
back info disk
                 Key: ROCKETMQ-267
                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-267
             Project: Apache RocketMQ
          Issue Type: Bug
    Affects Versions: 4.1.0-incubating
         Environment: linux
            Reporter: wenqi.huang
            Assignee: vongosling
            Priority: Critical


I found the following error in the client's log :

2017-08-10 09:06:57  ERROR [DubboServerHandler-10.28.109.45:20994-thread-475] 
c.c.d.a.c.b.r.RocketMQMsgProducer[RocketMQMsgProducer.java:42] -> Send ons msg 
failed, topic=TopicSubOrderDataSync, tag=TagActivityDataSync, 
key=activity-order-411320385584760066, msg=411320385584760066
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: 
[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in 
queue: 208ms, size of queue: 17
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
        at 
org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:531)
 ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
        at 
org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:345)
 ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
        at 
org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:327)
 ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
        at 
org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:290)
 ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
        at 
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:688)
 ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
        at 
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:458)
 ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
        at 
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1049)
 ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
        at 
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1008)
 ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
        at 
org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:204)
 ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]

and I look into store.log in RocketMQ Broker, and found that at the same time, 
the following line is there:

2017-08-10 09:06:57 INFO FlushRealTimeService - Flush data to disk costs 1240 ms

I look into the source code, and found that rocketmq have some index files, 
which rocketmq will not write immediately(because it is wrote randomlly), but 
when a index file write finished(about 500MB), rocketmq finally force it into 
disk, that means dirty pages will be about 500MB maximally, (I have executed 
the bin/os.sh under RocketMQ,  which change the default behavior of linux 
pdflush).because the [vm.dirty_ratio] is 50, and the available memory is about 
1600MB at my linux machine, 500MB will not exceed 50% of 1600M, so pdflush will 
not executed in this way. So I guess writeback will impact the RT of  write.

So I write a testcase and proved this, the code is:

public class MappedFileTest {
    public static void main(String[] args) throws IOException, 
InterruptedException {
        //mock rocketmq's index file
        String indexFile = "/home/admin/rocketmq-data/index/index";
        int indexFileToWriteInMB = 180;
        FileChannel indexFileChannel = new RandomAccessFile(new 
File(indexFile), "rw").getChannel();
        final MappedByteBuffer indexFileBuffer = 
indexFileChannel.map(MapMode.READ_WRITE, 0, 1024*1024*500);//500M

        //put some dirty data, attention that the data size will not overflow 
vm.dirty_background_ratio;
        // because we set vm.dirty_expire_centisecs to 3000,so after 30 
seconds,pdflush will writeback the dirty data into disk.
        byte[] bs = new byte[1024*1024*indexFileToWriteInMB];//180m
        Arrays.fill(bs,(byte)1);
        indexFileBuffer.put(bs);

        //mock rocketmq's commitlog file
        String commitLogFile = "/home/admin/rocketmq-data/commitlog/commitlog";
        FileChannel commitLogChannel = new RandomAccessFile(new 
File(commitLogFile), "rw").getChannel();
        final MappedByteBuffer commitLogBuffer = 
commitLogChannel.map(MapMode.READ_WRITE, 0, 1024*1024*1024);//1G

        final Object lockObj = new Object();

        //mock FlushCommitLogService to writeback dirty data of commitLog into 
disk.
        FlushCommitLogService commitLogService = new 
FlushCommitLogService(lockObj, commitLogBuffer);
        commitLogService.start();

        //mock messageReceived to write data into commitLogFile.
        mockMessageReceived(lockObj, commitLogBuffer);

        //wait for about 30 seconds(let linux pdflush to writeback dirty data 
of indexFile), then you will see some output like(this will block the thread 
that handle messages, then client will fail to send message,until pdflush is 
done):
        //---write cost ms: 213
        //flushToDisk cost ms:502
    }

    private static void mockMessageReceived(Object lockObj, MappedByteBuffer 
commitLogBuffer){
        byte[] bs = new byte[1024*10];//10kb
        Arrays.fill(bs,(byte)1);
        for(int i=0;i<1000;i++){
            commitLogBuffer.position(bs.length * i);
            long start = System.currentTimeMillis();
            commitLogBuffer.put(bs);
            start = System.currentTimeMillis() - start;
            if(start > 1) {
                System.out.println("---write cost ms:" + start);
            }

            if(i != 0 && i % 2 == 0) {
                synchronized (lockObj) {
                    lockObj.notify();
                }
            }

            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static class FlushCommitLogService extends Thread{
        private Object lockObj;
        private MappedByteBuffer commitLogBuffer;

        public FlushCommitLogService(Object lockObj,MappedByteBuffer 
commitLogBuffer) {
            this.lockObj = lockObj;
            this.commitLogBuffer = commitLogBuffer;
        }

        public void run(){
            while(true) {
                synchronized (lockObj) {
                    try {
                        lockObj.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                System.out.println("flushToDisk started");
                long start = System.currentTimeMillis();
                commitLogBuffer.force();
                start = System.currentTimeMillis() - start;
                System.out.println("flushToDisk cost ms:" + start);
            }
        }
    }
}

before running this testcase, please make sure you are running this code in 
linux platform, and set the following linux configs:

vm.dirty_background_ratio = 50
vm.dirty_ratio = 50
vm.dirty_expire_centisecs = 3000
vm.dirty_writeback_centisecs = 500

The code will do the following things:
1.open a index file, and write some dirty data into it (do not flush 
manully,let linux pdflush to writeback it into disk)
2.open a commitLog file, and write some dirty data into it, every time the 
dirty data reach 20KB,it will  notify another thread to force data into disk. 
look into the outputs,it's RT is healthy for now.
3.wait about 30 seconds,until the pdflush wakeup and found dirty data in index 
file exists for 30 seconds,then pdflush will writeback it into disk(you can 
execute [cat /proc/meminfo|grep Dirty] to show the size of all dirty data).  At 
the same time,look into the outputs,and you will see some output like this:

---write cost ms: 213
flushToDisk cost ms:502

the RT of write call is unacceptable,  this will block the thread that handle 
messages, then client will fail to send message,until pdflush is done. And if 
you set the dirty data in index file bigger, the write RT will grow bigger too.

the bin/os.sh set vm.dirty_writeback_centisecs  to 360000(an hour), and I look 
into store.log, found that the log [Flush data to disk costs * ms] appear 
hourly, this proved my guess much more realistic.

I attention that the index file have three sections, with the first is head, 
the second is index wrote randomly, and the third is another type of index 
wrote sequential. so maybe the second section can be move into another file and 
this will only use 20MB of memory,which will make writeback faster,at the same 
time,the third section can be wrote sequential and writeback into disk as soon 
as possible ,just like what commitlog does. but when recover from system crash, 
consistency of this two index files is also a problem.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to