RongtongJin commented on code in PR #4355:
URL: https://github.com/apache/rocketmq/pull/4355#discussion_r879109393


##########
store/src/main/java/org/apache/rocketmq/store/CommitLog.java:
##########
@@ -1077,13 +1077,13 @@ private boolean needHandleHA(MessageExt messageExt) {
     }
 
     private CompletableFuture<PutMessageResult> 
handleDiskFlushAndHA(PutMessageResult putMessageResult,
-        MessageExt messageExt, int needAckNums, boolean needHandleHA) {
+        MessageExt messageExt, int needAckNums, boolean needHandleHA, boolean 
allAckInSyncStateSet) {

Review Comment:
   是不是可以needAckNums=-1代表allAckInSyncStateSet,而不需要allAckInSyncStateSet参数一直往下传



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java:
##########
@@ -89,12 +103,14 @@ public boolean changeToMaster(int masterEpoch) {
         this.epochCache.appendEntry(newEpochEntry);
 
         this.defaultMessageStore.recoverTopicQueueTable();
+
+        this.syncStateSet.clear();
+        this.syncStateSet.add(this.localAddress);

Review Comment:
   再另一个线程里面会用syncStateSet.size判断ack,但这里更新分成了两步,所以应该会并发问题



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java:
##########
@@ -118,28 +134,93 @@ public boolean changeToSlave(String newMasterAddr, int 
newMasterEpoch, Long slav
         }
     }
 
-    @Override
+    @Override public HAClient getHAClient() {
+        return this.haClient;
+    }
+
+    @Override public void updateHaMasterAddress(String newAddr) {
+        if (this.haClient != null) {
+            this.haClient.updateHaMasterAddress(newAddr);
+        }
+    }
+
+    @Override public void updateMasterAddress(String newAddr) {
+    }
+
+    public void registerSyncStateSetChangedListener(final 
Consumer<Set<String>> listener) {
+        this.syncStateSetChangedListeners.add(listener);
+    }
+
+    public void notifySyncStateSetChanged(final Set<String> newSyncStateSet) {
+        this.executorService.submit(() -> {
+            for (Consumer<Set<String>> listener : 
syncStateSetChangedListeners) {
+                listener.accept(newSyncStateSet);
+            }
+        });
+    }
+
     public void setSyncStateSet(final Set<String> syncStateSet) {
-        this.syncStateSet = new HashSet<>(syncStateSet);
+        this.syncStateSet.clear();
+        this.syncStateSet.addAll(syncStateSet);

Review Comment:
   再另一个线程里面会用syncStateSet.size判断ack,但这里更新分成了两步,所以应该会并发问题



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -279,12 +296,17 @@ protected boolean processReadResult(ByteBuffer 
byteBufferRead) {
                                     long slaveMaxOffset = 
byteBufferRead.getLong(readPosition + 4);
                                     ReadSocketService.this.processPosition += 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
 
-                                    slaveAckOffset = slaveMaxOffset;
+                                    AutoSwitchHAConnection.this.slaveAckOffset 
= slaveMaxOffset;
                                     if (slaveRequestOffset < 0) {
                                         slaveRequestOffset = slaveMaxOffset;
                                     }
                                     LOGGER.info("slave[" + clientAddress + "] 
request offset " + slaveMaxOffset);
                                     byteBufferRead.position(readSocketPos);
+                                    if (slaveMaxOffset >= 
AutoSwitchHAConnection.this.lastMasterMaxOffset) {
+                                        
AutoSwitchHAConnection.this.lastCatchUpTimeMs = 
Math.max(AutoSwitchHAConnection.this.lastTransferTimeMs, 
AutoSwitchHAConnection.this.lastCatchUpTimeMs);
+                                        
AutoSwitchHAConnection.this.haService.maybeExpandInSyncStateSet(AutoSwitchHAConnection.this.slaveAddress,
 slaveMaxOffset);
+                                    }

Review Comment:
   这里应该也有并发问题,比如slaveMaxOffset >= 
AutoSwitchHAConnection.this.lastMasterMaxOffset,但写那里lastTransferTimeMs已经更新了,导致lastCatchUpTimeMs错误。



##########
store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java:
##########
@@ -71,28 +74,51 @@ private void doWaitTransfer() {
                     boolean transferOK = false;
 
                     long deadLine = req.getDeadLine();
+                    final boolean allAckInSyncStateSet = 
req.isAllAckInSyncStateSet();
 
                     for (int i = 0; !transferOK && deadLine - 
System.nanoTime() > 0; i++) {
                         if (i > 0) {
                             this.notifyTransferObject.waitForRunning(1000);
                         }
 
-                        if (req.getAckNums() <= 1) {
+                        if (req.getAckNums() <= 1 && !allAckInSyncStateSet) {
                             transferOK = 
haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();
                             continue;
                         }
 
-                        int ackNums = 0;
-                        for (HAConnection conn : 
haService.getConnectionList()) {
-                            // TODO: We must ensure every HAConnection 
represents a different slave
-                            // Solution: Consider assign a unique and fixed 
IP:ADDR for each different slave
-                            if (conn.getSlaveAckOffset() >= 
req.getNextOffset()) {
-                                ackNums++;
-                            }
-                            if (ackNums >= req.getAckNums()) {
+                        if (allAckInSyncStateSet && this.haService instanceof 
AutoSwitchHAService) {
+                            // In this mode, we must wait for all replicas 
that in InSyncStateSet.
+                            final AutoSwitchHAService autoSwitchHAService = 
(AutoSwitchHAService) this.haService;
+                            final Set<String> syncStateSet = 
autoSwitchHAService.getSyncStateSet();
+                            if (syncStateSet.size() <= 1) {
+                                // Only master
                                 transferOK = true;
                                 break;
                             }
+                            int ackNums = 0;
+                            for (HAConnection conn : 
haService.getConnectionList()) {
+                                final AutoSwitchHAConnection 
autoSwitchHAConnection = (AutoSwitchHAConnection) conn;
+                                if 
(syncStateSet.contains(autoSwitchHAConnection.getClientAddress()) && 
autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
+                                    ackNums ++;
+                                }
+                                if (ackNums >= syncStateSet.size()) {
+                                    transferOK = true;
+                                    break;
+                                }
+                            }

Review Comment:
   这里的ackNums没有把master自己算进去。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to