RongtongJin commented on code in PR #4355:
URL: https://github.com/apache/rocketmq/pull/4355#discussion_r879109393
##########
store/src/main/java/org/apache/rocketmq/store/CommitLog.java:
##########
@@ -1077,13 +1077,13 @@ private boolean needHandleHA(MessageExt messageExt) {
}
private CompletableFuture<PutMessageResult>
handleDiskFlushAndHA(PutMessageResult putMessageResult,
- MessageExt messageExt, int needAckNums, boolean needHandleHA) {
+ MessageExt messageExt, int needAckNums, boolean needHandleHA, boolean
allAckInSyncStateSet) {
Review Comment:
是不是可以needAckNums=-1代表allAckInSyncStateSet,而不需要allAckInSyncStateSet参数一直往下传
##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java:
##########
@@ -89,12 +103,14 @@ public boolean changeToMaster(int masterEpoch) {
this.epochCache.appendEntry(newEpochEntry);
this.defaultMessageStore.recoverTopicQueueTable();
+
+ this.syncStateSet.clear();
+ this.syncStateSet.add(this.localAddress);
Review Comment:
再另一个线程里面会用syncStateSet.size判断ack,但这里更新分成了两步,所以应该会并发问题
##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java:
##########
@@ -118,28 +134,93 @@ public boolean changeToSlave(String newMasterAddr, int
newMasterEpoch, Long slav
}
}
- @Override
+ @Override public HAClient getHAClient() {
+ return this.haClient;
+ }
+
+ @Override public void updateHaMasterAddress(String newAddr) {
+ if (this.haClient != null) {
+ this.haClient.updateHaMasterAddress(newAddr);
+ }
+ }
+
+ @Override public void updateMasterAddress(String newAddr) {
+ }
+
+ public void registerSyncStateSetChangedListener(final
Consumer<Set<String>> listener) {
+ this.syncStateSetChangedListeners.add(listener);
+ }
+
+ public void notifySyncStateSetChanged(final Set<String> newSyncStateSet) {
+ this.executorService.submit(() -> {
+ for (Consumer<Set<String>> listener :
syncStateSetChangedListeners) {
+ listener.accept(newSyncStateSet);
+ }
+ });
+ }
+
public void setSyncStateSet(final Set<String> syncStateSet) {
- this.syncStateSet = new HashSet<>(syncStateSet);
+ this.syncStateSet.clear();
+ this.syncStateSet.addAll(syncStateSet);
Review Comment:
再另一个线程里面会用syncStateSet.size判断ack,但这里更新分成了两步,所以应该会并发问题
##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -279,12 +296,17 @@ protected boolean processReadResult(ByteBuffer
byteBufferRead) {
long slaveMaxOffset =
byteBufferRead.getLong(readPosition + 4);
ReadSocketService.this.processPosition +=
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
- slaveAckOffset = slaveMaxOffset;
+ AutoSwitchHAConnection.this.slaveAckOffset
= slaveMaxOffset;
if (slaveRequestOffset < 0) {
slaveRequestOffset = slaveMaxOffset;
}
LOGGER.info("slave[" + clientAddress + "]
request offset " + slaveMaxOffset);
byteBufferRead.position(readSocketPos);
+ if (slaveMaxOffset >=
AutoSwitchHAConnection.this.lastMasterMaxOffset) {
+
AutoSwitchHAConnection.this.lastCatchUpTimeMs =
Math.max(AutoSwitchHAConnection.this.lastTransferTimeMs,
AutoSwitchHAConnection.this.lastCatchUpTimeMs);
+
AutoSwitchHAConnection.this.haService.maybeExpandInSyncStateSet(AutoSwitchHAConnection.this.slaveAddress,
slaveMaxOffset);
+ }
Review Comment:
这里应该也有并发问题,比如slaveMaxOffset >=
AutoSwitchHAConnection.this.lastMasterMaxOffset,但写那里lastTransferTimeMs已经更新了,导致lastCatchUpTimeMs错误。
##########
store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java:
##########
@@ -71,28 +74,51 @@ private void doWaitTransfer() {
boolean transferOK = false;
long deadLine = req.getDeadLine();
+ final boolean allAckInSyncStateSet =
req.isAllAckInSyncStateSet();
for (int i = 0; !transferOK && deadLine -
System.nanoTime() > 0; i++) {
if (i > 0) {
this.notifyTransferObject.waitForRunning(1000);
}
- if (req.getAckNums() <= 1) {
+ if (req.getAckNums() <= 1 && !allAckInSyncStateSet) {
transferOK =
haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();
continue;
}
- int ackNums = 0;
- for (HAConnection conn :
haService.getConnectionList()) {
- // TODO: We must ensure every HAConnection
represents a different slave
- // Solution: Consider assign a unique and fixed
IP:ADDR for each different slave
- if (conn.getSlaveAckOffset() >=
req.getNextOffset()) {
- ackNums++;
- }
- if (ackNums >= req.getAckNums()) {
+ if (allAckInSyncStateSet && this.haService instanceof
AutoSwitchHAService) {
+ // In this mode, we must wait for all replicas
that in InSyncStateSet.
+ final AutoSwitchHAService autoSwitchHAService =
(AutoSwitchHAService) this.haService;
+ final Set<String> syncStateSet =
autoSwitchHAService.getSyncStateSet();
+ if (syncStateSet.size() <= 1) {
+ // Only master
transferOK = true;
break;
}
+ int ackNums = 0;
+ for (HAConnection conn :
haService.getConnectionList()) {
+ final AutoSwitchHAConnection
autoSwitchHAConnection = (AutoSwitchHAConnection) conn;
+ if
(syncStateSet.contains(autoSwitchHAConnection.getClientAddress()) &&
autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
+ ackNums ++;
+ }
+ if (ackNums >= syncStateSet.size()) {
+ transferOK = true;
+ break;
+ }
+ }
Review Comment:
这里的ackNums没有把master自己算进去。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]