This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta by this push:
     new ef37465e5 Fix bug that return IN_SYNC_REPLICAS_NOT_ENOUGH when 
enableSlaveActingMaster is false (#4554)
ef37465e5 is described below

commit ef37465e593a06fdef4eb74b539a4146a4d4839c
Author: rongtong <[email protected]>
AuthorDate: Tue Jul 5 10:15:47 2022 +0800

    Fix bug that return IN_SYNC_REPLICAS_NOT_ENOUGH when 
enableSlaveActingMaster is false (#4554)
    
    * Automatic degradation is not performed When enableSlaveActingMaster=false
    
    * Add more UTs
---
 .../java/org/apache/rocketmq/store/CommitLog.java  | 12 ++--
 .../rocketmq/store/DefaultMessageStoreTest.java    | 20 ++++++
 .../java/org/apache/rocketmq/store/HATest.java     | 73 ++++++++++++++++++++++
 3 files changed, 97 insertions(+), 8 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index be013e9d4..62e9f59b7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -139,8 +139,6 @@ public class CommitLog implements Swappable {
     }
 
     public void start() {
-        this.flushManager.start();
-        log.info("start commitLog successfully. storeRoot: {}", 
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
         this.flushManager.start();
         log.info("start commitLog successfully. storeRoot: {}", 
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
         flushDiskWatcher.setDaemon(true);
@@ -148,8 +146,6 @@ public class CommitLog implements Swappable {
     }
 
     public void shutdown() {
-        this.flushManager.shutdown();
-        log.info("shutdown commitLog successfully. storeRoot: {}", 
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
         this.flushManager.shutdown();
         log.info("shutdown commitLog successfully. storeRoot: {}", 
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
         flushDiskWatcher.shutdown(true);
@@ -803,9 +799,9 @@ public class CommitLog implements Swappable {
         }
 
         boolean needHandleHA = needHandleHA(msg);
-        int needAckNums = 1;
+        int needAckNums = 
this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
 
-        if (needHandleHA) {
+        if (needHandleHA && 
this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) {
             int inSyncReplicas = 
Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
                 
this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
             needAckNums = calcNeedAckNums(inSyncReplicas);
@@ -949,10 +945,10 @@ public class CommitLog implements Swappable {
             currOffset = mappedFile.getFileFromOffset() + 
mappedFile.getWrotePosition();
         }
 
-        int needAckNums = 1;
         boolean needHandleHA = needHandleHA(messageExtBatch);
+        int needAckNums = 
this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
 
-        if (needHandleHA) {
+        if (needHandleHA && 
this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) {
             int inSyncReplicas = 
Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
                 
this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
             needAckNums = calcNeedAckNums(inSyncReplicas);
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 1e79820a3..b39b9bb03 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -743,11 +743,31 @@ public class DefaultMessageStoreTest {
         messageStoreConfig.setTotalReplicas(2);
         messageStoreConfig.setInSyncReplicas(2);
         messageStoreConfig.setEnableAutoInSyncReplicas(false);
+        ((DefaultMessageStore) 
this.messageStore).getBrokerConfig().setEnableSlaveActingMaster(true);
         this.messageStore.setAliveReplicaNumInGroup(1);
 
         MessageExtBrokerInner msg = buildMessage();
         PutMessageResult result = this.messageStore.putMessage(msg);
         
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH);
+        ((DefaultMessageStore) 
this.messageStore).getBrokerConfig().setEnableSlaveActingMaster(false);
+    }
+
+
+    @Test
+    public void testPutMsgWhenAdaptiveDegradation () {
+        MessageStoreConfig messageStoreConfig = ((DefaultMessageStore) 
this.messageStore).getMessageStoreConfig();
+        messageStoreConfig.setBrokerRole(BrokerRole.SYNC_MASTER);
+        messageStoreConfig.setTotalReplicas(2);
+        messageStoreConfig.setInSyncReplicas(2);
+        messageStoreConfig.setEnableAutoInSyncReplicas(true);
+        ((DefaultMessageStore) 
this.messageStore).getBrokerConfig().setEnableSlaveActingMaster(true);
+        this.messageStore.setAliveReplicaNumInGroup(1);
+
+        MessageExtBrokerInner msg = buildMessage();
+        PutMessageResult result = this.messageStore.putMessage(msg);
+        
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
+        ((DefaultMessageStore) 
this.messageStore).getBrokerConfig().setEnableSlaveActingMaster(false);
+        messageStoreConfig.setEnableAutoInSyncReplicas(false);
     }
 
 
diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java 
b/store/src/test/java/org/apache/rocketmq/store/HATest.java
index bee9d6429..ea7bf725f 100644
--- a/store/src/test/java/org/apache/rocketmq/store/HATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java
@@ -146,6 +146,39 @@ public class HATest {
 
         //shutdown slave, putMessage should return FLUSH_SLAVE_TIMEOUT
         slaveMessageStore.shutdown();
+
+        //wait to let master clean the slave's connection
+        Thread.sleep(masterMessageStoreConfig.getHaHousekeepingInterval() + 
500);
+        for (long i = 0; i < totalMsgs; i++) {
+            CompletableFuture<PutMessageResult> putResultFuture = 
messageStore.asyncPutMessage(buildMessage());
+            PutMessageResult result = putResultFuture.get();
+            assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, 
result.getPutMessageStatus());
+        }
+    }
+
+    @Test
+    public void testSemiSyncReplicaWhenSlaveActingMaster() throws Exception {
+        long totalMsgs = 5;
+        QUEUE_TOTAL = 1;
+        MessageBody = StoreMessage.getBytes();
+        
((DefaultMessageStore)messageStore).getBrokerConfig().setEnableSlaveActingMaster(true);
+        for (long i = 0; i < totalMsgs; i++) {
+            MessageExtBrokerInner msg = buildMessage();
+            CompletableFuture<PutMessageResult> putResultFuture = 
messageStore.asyncPutMessage(msg);
+            PutMessageResult result = putResultFuture.get();
+            assertEquals(PutMessageStatus.PUT_OK, 
result.getPutMessageStatus());
+            //message has been replicated to slave's commitLog, but maybe not 
dispatch to ConsumeQueue yet
+            //so direct read from commitLog by physical offset
+            MessageExt slaveMsg = 
slaveMessageStore.lookMessageByOffset(result.getAppendMessageResult().getWroteOffset());
+            assertNotNull(slaveMsg);
+            assertTrue(Arrays.equals(msg.getBody(), slaveMsg.getBody()));
+            assertEquals(msg.getTopic(), slaveMsg.getTopic());
+            assertEquals(msg.getTags(), slaveMsg.getTags());
+            assertEquals(msg.getKeys(), slaveMsg.getKeys());
+        }
+
+        //shutdown slave, putMessage should return IN_SYNC_REPLICAS_NOT_ENOUGH
+        slaveMessageStore.shutdown();
         messageStore.setAliveReplicaNumInGroup(1);
 
         //wait to let master clean the slave's connection
@@ -155,6 +188,46 @@ public class HATest {
             PutMessageResult result = putResultFuture.get();
             assertEquals(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, 
result.getPutMessageStatus());
         }
+
+        
((DefaultMessageStore)messageStore).getBrokerConfig().setEnableSlaveActingMaster(false);
+    }
+
+    @Test
+    public void testSemiSyncReplicaWhenAdaptiveDegradation() throws Exception {
+        long totalMsgs = 5;
+        QUEUE_TOTAL = 1;
+        MessageBody = StoreMessage.getBytes();
+        
((DefaultMessageStore)messageStore).getBrokerConfig().setEnableSlaveActingMaster(true);
+        messageStore.getMessageStoreConfig().setEnableAutoInSyncReplicas(true);
+        for (long i = 0; i < totalMsgs; i++) {
+            MessageExtBrokerInner msg = buildMessage();
+            CompletableFuture<PutMessageResult> putResultFuture = 
messageStore.asyncPutMessage(msg);
+            PutMessageResult result = putResultFuture.get();
+            assertEquals(PutMessageStatus.PUT_OK, 
result.getPutMessageStatus());
+            //message has been replicated to slave's commitLog, but maybe not 
dispatch to ConsumeQueue yet
+            //so direct read from commitLog by physical offset
+            MessageExt slaveMsg = 
slaveMessageStore.lookMessageByOffset(result.getAppendMessageResult().getWroteOffset());
+            assertNotNull(slaveMsg);
+            assertTrue(Arrays.equals(msg.getBody(), slaveMsg.getBody()));
+            assertEquals(msg.getTopic(), slaveMsg.getTopic());
+            assertEquals(msg.getTags(), slaveMsg.getTags());
+            assertEquals(msg.getKeys(), slaveMsg.getKeys());
+        }
+
+        //shutdown slave, putMessage should return IN_SYNC_REPLICAS_NOT_ENOUGH
+        slaveMessageStore.shutdown();
+        messageStore.setAliveReplicaNumInGroup(1);
+
+        //wait to let master clean the slave's connection
+        Thread.sleep(masterMessageStoreConfig.getHaHousekeepingInterval() + 
500);
+        for (long i = 0; i < totalMsgs; i++) {
+            CompletableFuture<PutMessageResult> putResultFuture = 
messageStore.asyncPutMessage(buildMessage());
+            PutMessageResult result = putResultFuture.get();
+            assertEquals(PutMessageStatus.PUT_OK, 
result.getPutMessageStatus());
+        }
+
+        
((DefaultMessageStore)messageStore).getBrokerConfig().setEnableSlaveActingMaster(false);
+        
messageStore.getMessageStoreConfig().setEnableAutoInSyncReplicas(false);
     }
 
     @After

Reply via email to