This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta by this push:
new ef37465e5 Fix bug that return IN_SYNC_REPLICAS_NOT_ENOUGH when
enableSlaveActingMaster is false (#4554)
ef37465e5 is described below
commit ef37465e593a06fdef4eb74b539a4146a4d4839c
Author: rongtong <[email protected]>
AuthorDate: Tue Jul 5 10:15:47 2022 +0800
Fix bug that return IN_SYNC_REPLICAS_NOT_ENOUGH when
enableSlaveActingMaster is false (#4554)
* Automatic degradation is not performed When enableSlaveActingMaster=false
* Add more UTs
---
.../java/org/apache/rocketmq/store/CommitLog.java | 12 ++--
.../rocketmq/store/DefaultMessageStoreTest.java | 20 ++++++
.../java/org/apache/rocketmq/store/HATest.java | 73 ++++++++++++++++++++++
3 files changed, 97 insertions(+), 8 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index be013e9d4..62e9f59b7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -139,8 +139,6 @@ public class CommitLog implements Swappable {
}
public void start() {
- this.flushManager.start();
- log.info("start commitLog successfully. storeRoot: {}",
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
this.flushManager.start();
log.info("start commitLog successfully. storeRoot: {}",
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
flushDiskWatcher.setDaemon(true);
@@ -148,8 +146,6 @@ public class CommitLog implements Swappable {
}
public void shutdown() {
- this.flushManager.shutdown();
- log.info("shutdown commitLog successfully. storeRoot: {}",
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
this.flushManager.shutdown();
log.info("shutdown commitLog successfully. storeRoot: {}",
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
flushDiskWatcher.shutdown(true);
@@ -803,9 +799,9 @@ public class CommitLog implements Swappable {
}
boolean needHandleHA = needHandleHA(msg);
- int needAckNums = 1;
+ int needAckNums =
this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
- if (needHandleHA) {
+ if (needHandleHA &&
this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) {
int inSyncReplicas =
Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
needAckNums = calcNeedAckNums(inSyncReplicas);
@@ -949,10 +945,10 @@ public class CommitLog implements Swappable {
currOffset = mappedFile.getFileFromOffset() +
mappedFile.getWrotePosition();
}
- int needAckNums = 1;
boolean needHandleHA = needHandleHA(messageExtBatch);
+ int needAckNums =
this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
- if (needHandleHA) {
+ if (needHandleHA &&
this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) {
int inSyncReplicas =
Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
needAckNums = calcNeedAckNums(inSyncReplicas);
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 1e79820a3..b39b9bb03 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -743,11 +743,31 @@ public class DefaultMessageStoreTest {
messageStoreConfig.setTotalReplicas(2);
messageStoreConfig.setInSyncReplicas(2);
messageStoreConfig.setEnableAutoInSyncReplicas(false);
+ ((DefaultMessageStore)
this.messageStore).getBrokerConfig().setEnableSlaveActingMaster(true);
this.messageStore.setAliveReplicaNumInGroup(1);
MessageExtBrokerInner msg = buildMessage();
PutMessageResult result = this.messageStore.putMessage(msg);
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH);
+ ((DefaultMessageStore)
this.messageStore).getBrokerConfig().setEnableSlaveActingMaster(false);
+ }
+
+
+ @Test
+ public void testPutMsgWhenAdaptiveDegradation () {
+ MessageStoreConfig messageStoreConfig = ((DefaultMessageStore)
this.messageStore).getMessageStoreConfig();
+ messageStoreConfig.setBrokerRole(BrokerRole.SYNC_MASTER);
+ messageStoreConfig.setTotalReplicas(2);
+ messageStoreConfig.setInSyncReplicas(2);
+ messageStoreConfig.setEnableAutoInSyncReplicas(true);
+ ((DefaultMessageStore)
this.messageStore).getBrokerConfig().setEnableSlaveActingMaster(true);
+ this.messageStore.setAliveReplicaNumInGroup(1);
+
+ MessageExtBrokerInner msg = buildMessage();
+ PutMessageResult result = this.messageStore.putMessage(msg);
+
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
+ ((DefaultMessageStore)
this.messageStore).getBrokerConfig().setEnableSlaveActingMaster(false);
+ messageStoreConfig.setEnableAutoInSyncReplicas(false);
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java
b/store/src/test/java/org/apache/rocketmq/store/HATest.java
index bee9d6429..ea7bf725f 100644
--- a/store/src/test/java/org/apache/rocketmq/store/HATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java
@@ -146,6 +146,39 @@ public class HATest {
//shutdown slave, putMessage should return FLUSH_SLAVE_TIMEOUT
slaveMessageStore.shutdown();
+
+ //wait to let master clean the slave's connection
+ Thread.sleep(masterMessageStoreConfig.getHaHousekeepingInterval() +
500);
+ for (long i = 0; i < totalMsgs; i++) {
+ CompletableFuture<PutMessageResult> putResultFuture =
messageStore.asyncPutMessage(buildMessage());
+ PutMessageResult result = putResultFuture.get();
+ assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT,
result.getPutMessageStatus());
+ }
+ }
+
+ @Test
+ public void testSemiSyncReplicaWhenSlaveActingMaster() throws Exception {
+ long totalMsgs = 5;
+ QUEUE_TOTAL = 1;
+ MessageBody = StoreMessage.getBytes();
+
((DefaultMessageStore)messageStore).getBrokerConfig().setEnableSlaveActingMaster(true);
+ for (long i = 0; i < totalMsgs; i++) {
+ MessageExtBrokerInner msg = buildMessage();
+ CompletableFuture<PutMessageResult> putResultFuture =
messageStore.asyncPutMessage(msg);
+ PutMessageResult result = putResultFuture.get();
+ assertEquals(PutMessageStatus.PUT_OK,
result.getPutMessageStatus());
+ //message has been replicated to slave's commitLog, but maybe not
dispatch to ConsumeQueue yet
+ //so direct read from commitLog by physical offset
+ MessageExt slaveMsg =
slaveMessageStore.lookMessageByOffset(result.getAppendMessageResult().getWroteOffset());
+ assertNotNull(slaveMsg);
+ assertTrue(Arrays.equals(msg.getBody(), slaveMsg.getBody()));
+ assertEquals(msg.getTopic(), slaveMsg.getTopic());
+ assertEquals(msg.getTags(), slaveMsg.getTags());
+ assertEquals(msg.getKeys(), slaveMsg.getKeys());
+ }
+
+ //shutdown slave, putMessage should return IN_SYNC_REPLICAS_NOT_ENOUGH
+ slaveMessageStore.shutdown();
messageStore.setAliveReplicaNumInGroup(1);
//wait to let master clean the slave's connection
@@ -155,6 +188,46 @@ public class HATest {
PutMessageResult result = putResultFuture.get();
assertEquals(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH,
result.getPutMessageStatus());
}
+
+
((DefaultMessageStore)messageStore).getBrokerConfig().setEnableSlaveActingMaster(false);
+ }
+
+ @Test
+ public void testSemiSyncReplicaWhenAdaptiveDegradation() throws Exception {
+ long totalMsgs = 5;
+ QUEUE_TOTAL = 1;
+ MessageBody = StoreMessage.getBytes();
+
((DefaultMessageStore)messageStore).getBrokerConfig().setEnableSlaveActingMaster(true);
+ messageStore.getMessageStoreConfig().setEnableAutoInSyncReplicas(true);
+ for (long i = 0; i < totalMsgs; i++) {
+ MessageExtBrokerInner msg = buildMessage();
+ CompletableFuture<PutMessageResult> putResultFuture =
messageStore.asyncPutMessage(msg);
+ PutMessageResult result = putResultFuture.get();
+ assertEquals(PutMessageStatus.PUT_OK,
result.getPutMessageStatus());
+ //message has been replicated to slave's commitLog, but maybe not
dispatch to ConsumeQueue yet
+ //so direct read from commitLog by physical offset
+ MessageExt slaveMsg =
slaveMessageStore.lookMessageByOffset(result.getAppendMessageResult().getWroteOffset());
+ assertNotNull(slaveMsg);
+ assertTrue(Arrays.equals(msg.getBody(), slaveMsg.getBody()));
+ assertEquals(msg.getTopic(), slaveMsg.getTopic());
+ assertEquals(msg.getTags(), slaveMsg.getTags());
+ assertEquals(msg.getKeys(), slaveMsg.getKeys());
+ }
+
+ //shutdown slave, putMessage should return IN_SYNC_REPLICAS_NOT_ENOUGH
+ slaveMessageStore.shutdown();
+ messageStore.setAliveReplicaNumInGroup(1);
+
+ //wait to let master clean the slave's connection
+ Thread.sleep(masterMessageStoreConfig.getHaHousekeepingInterval() +
500);
+ for (long i = 0; i < totalMsgs; i++) {
+ CompletableFuture<PutMessageResult> putResultFuture =
messageStore.asyncPutMessage(buildMessage());
+ PutMessageResult result = putResultFuture.get();
+ assertEquals(PutMessageStatus.PUT_OK,
result.getPutMessageStatus());
+ }
+
+
((DefaultMessageStore)messageStore).getBrokerConfig().setEnableSlaveActingMaster(false);
+
messageStore.getMessageStoreConfig().setEnableAutoInSyncReplicas(false);
}
@After