RongtongJin commented on code in PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#discussion_r866603589


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.DefaultHAService;
+import org.apache.rocketmq.store.ha.GroupTransferService;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
+
+/**
+ * SwitchAble ha service, support switch role to master or slave.
+ */
+public class AutoSwitchHAService extends DefaultHAService {
+    private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private EpochFileCache epochCache;
+    private AutoSwitchHAClient haClient;
+
+    public AutoSwitchHAService() {
+    }
+
+    @Override
+    public void init(final DefaultMessageStore defaultMessageStore) throws 
IOException {
+        this.epochCache = new 
EpochFileCache(defaultMessageStore.getMessageStoreConfig().getStorePathEpochFile());
+        this.epochCache.initCacheFromFile();
+        this.defaultMessageStore = defaultMessageStore;
+        this.acceptSocketService =
+            new 
AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+        this.groupTransferService = new GroupTransferService(this, 
defaultMessageStore);
+        if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() 
== BrokerRole.SLAVE) {
+            this.haClient = new AutoSwitchHAClient(this, defaultMessageStore, 
this.epochCache);
+        }
+        this.haConnectionStateNotificationService = new 
HAConnectionStateNotificationService(this, defaultMessageStore);
+    }
+
+    @Override
+    public boolean changeToMaster(int masterEpoch) {
+        final int lastEpoch = this.epochCache.lastEpoch();
+        if (masterEpoch <= lastEpoch) {
+            return false;
+        }
+        destroyConnections();
+        // Stop ha client if needed
+        if (this.haClient != null) {
+            this.haClient.shutdown();
+        }
+
+        // Truncate dirty file
+        final long truncateOffset = truncateInvalidMsg();
+        if (truncateOffset >= 0) {
+            this.epochCache.truncateSuffixByOffset(truncateOffset);
+        }
+
+        // Append new epoch to epochFile
+        final EpochEntry newEpochEntry = new EpochEntry(masterEpoch, 
this.defaultMessageStore.getMaxPhyOffset());
+        if (this.epochCache.lastEpoch() >= masterEpoch) {
+            this.epochCache.truncateSuffixByEpoch(masterEpoch);
+        }
+        this.epochCache.appendEntry(newEpochEntry);
+
+        this.defaultMessageStore.recoverTopicQueueTable();
+        LOGGER.info("Change ha to master success, newMasterEpoch:{}, 
startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
+        return true;
+    }
+
+    @Override
+    public boolean changeToSlave(String newMasterAddr, String newHaMasterAddr, 
int newMasterEpoch) {
+        final int lastEpoch = this.epochCache.lastEpoch();
+        if (newMasterEpoch <= lastEpoch) {
+            return false;
+        }
+        try {
+            destroyConnections();
+            if (this.haClient == null) {
+                this.haClient = new AutoSwitchHAClient(this, 
defaultMessageStore, this.epochCache);
+            } else {
+                this.haClient.reOpen();
+            }
+            this.haClient.updateHaMasterAddress(newHaMasterAddr);
+            this.haClient.updateMasterAddress(newMasterAddr);
+            this.haClient.start();
+            LOGGER.info("Change ha to slave success, newMasterAddress:{}, 
newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
+            return true;
+        } catch (final Exception e) {
+            LOGGER.error("Error happen when change ha to slave", e);
+            return false;
+        }
+    }
+
+    @Override
+    public Set<String> checkSyncStateSetChanged() {
+        final HashSet<String> newSyncStateSet = new 
HashSet<>(this.connectionList.size());
+        final long masterOffset = this.defaultMessageStore.getMaxPhyOffset();
+        for (HAConnection connection : this.connectionList) {
+            if (isInSyncSlave(masterOffset, connection)) {
+                newSyncStateSet.add(connection.getClientAddress());
+            }
+        }
+        return newSyncStateSet;
+    }
+
+    public void truncateEpochFilePrefix(final long offset) {
+        this.epochCache.truncatePrefixByOffset(offset);
+    }
+
+    public void truncateEpochFileSuffix(final long offset) {
+        this.epochCache.truncateSuffixByOffset(offset);
+    }
+
+    /**
+     * Try to truncate incomplete msg transferred from master.
+     */
+    public long truncateInvalidMsg() {
+        long dispatchBehind = this.defaultMessageStore.dispatchBehindBytes();
+        if (dispatchBehind <= 0) {
+            LOGGER.info("Dispatch complete, skip truncate");
+            return -1;
+        }
+
+        long reputFromOffset = this.defaultMessageStore.getMaxPhyOffset() - 
dispatchBehind;
+
+        boolean doNext = true;
+        while (reputFromOffset < this.defaultMessageStore.getMaxPhyOffset() && 
doNext) {
+            SelectMappedBufferResult result = 
this.defaultMessageStore.getCommitLog().getData(reputFromOffset);
+            if (result == null) {
+                break;
+            }
+
+            try {
+                reputFromOffset = result.getStartOffset();
+
+                int readSize = 0;
+                while (readSize < result.getSize()) {
+                    DispatchRequest dispatchRequest = 
this.defaultMessageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(),
 false, false);
+
+                    int size = dispatchRequest.getMsgSize();
+
+                    if (dispatchRequest.isSuccess()) {
+                        if (size > 0) {
+                            reputFromOffset += size;
+                            readSize += size;
+                        } else {
+                            reputFromOffset = 
this.defaultMessageStore.getCommitLog().rollNextFile(reputFromOffset);
+                            break;
+                        }
+                    } else {
+                        doNext = false;
+                        break;
+                    }
+                }
+            } finally {
+                result.release();
+            }
+        }
+
+        LOGGER.info("AutoRecoverHAClient truncate commitLog to {}", 
reputFromOffset);
+        this.defaultMessageStore.truncateDirtyFiles(reputFromOffset);
+        return reputFromOffset;
+    }
+
+    /**
+     * Get confirm offset (min slaveAckOffset of all syncStateSet)
+     */
+    public long getConfirmOffset() {
+        long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
+        for (HAConnection connection : this.connectionList) {
+            confirmOffset = Math.min(confirmOffset, 
connection.getSlaveAckOffset());
+        }
+        return confirmOffset;
+    }

Review Comment:
   这个后面还得改,得筛选出在SyncStateSet中的connnection然后比较位点



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAClient;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAClient extends ServiceThread implements HAClient {
+
+
+    /**
+     * Transfer header buffer size. Schema: state ordinal + additional 
info(maxOffset or flag)
+     * If in handshake state, we reuse additional info as the flag -- 
isSyncFromLastFile.
+     */
+    public static final int TRANSFER_HEADER_SIZE = 4 + 8;
+    private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+    private final AtomicReference<String> masterHaAddress = new 
AtomicReference<>();
+    private final AtomicReference<String> masterAddress = new 
AtomicReference<>();
+    private final ByteBuffer transferHeaderBuffer = 
ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+    private final AutoSwitchHAService haService;
+    private final ByteBuffer byteBufferRead = 
ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+    private final DefaultMessageStore messageStore;
+    private final EpochFileCache epochCache;
+
+    private SocketChannel socketChannel;
+    private Selector selector;
+    private AbstractHAReader haReader;
+    private HAWriter haWriter;
+    private FlowMonitor flowMonitor;
+    /**
+     * last time that slave reads date from master.
+     */
+    private long lastReadTimestamp;
+    /**
+     * last time that slave reports offset to master.
+     */
+    private long lastWriteTimestamp;
+
+    private long currentReportedOffset;
+    private int processPosition;
+    private volatile HAConnectionState currentState;
+    /**
+     * Current epoch
+     */
+    private volatile long currentReceivedEpoch;
+
+    /**
+     * Confirm offset = min(localMaxOffset, master confirm offset).
+     */
+    private volatile long confirmOffset;
+
+    public static final int SYNC_FROM_LAST_FILE = -1;
+
+    public static final int SYNC_FROM_FIRST_FILE = -2;
+
+    public AutoSwitchHAClient(AutoSwitchHAService haService, 
DefaultMessageStore defaultMessageStore,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.messageStore = defaultMessageStore;
+        this.epochCache = epochCache;
+        init();
+    }
+
+    public void init() throws IOException {
+        this.selector = RemotingUtil.openSelector();
+        this.flowMonitor = new 
FlowMonitor(this.messageStore.getMessageStoreConfig());
+        this.haReader = new HAClientReader();
+        haReader.registerHook(readSize -> {
+            if (readSize > 0) {
+                
AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
+                lastReadTimestamp = System.currentTimeMillis();
+            }
+        });
+        this.haWriter = new HAWriter();
+        haWriter.registerHook(writeSize -> {
+            if (writeSize > 0) {
+                lastWriteTimestamp = System.currentTimeMillis();
+            }
+        });
+        changeCurrentState(HAConnectionState.READY);
+        this.currentReceivedEpoch = -1;
+        this.currentReportedOffset = 0;
+        this.processPosition = 0;
+        this.confirmOffset = -1;
+        this.lastReadTimestamp = System.currentTimeMillis();
+        this.lastWriteTimestamp = System.currentTimeMillis();
+    }
+
+    public void reOpen() throws IOException {
+        shutdown();
+        init();
+    }
+
+    @Override public String getServiceName() {
+        return AutoSwitchHAClient.class.getSimpleName();
+    }
+
+    @Override public void updateMasterAddress(String newAddress) {
+        String currentAddr = this.masterAddress.get();
+        if (masterAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master address, OLD: " + currentAddr + " NEW: 
" + newAddress);
+        }
+    }
+
+    @Override public void updateHaMasterAddress(String newAddress) {
+        String currentAddr = this.masterHaAddress.get();
+        if (masterHaAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master ha address, OLD: " + currentAddr + " 
NEW: " + newAddress);
+        }
+    }
+
+    @Override public String getMasterAddress() {
+        return this.masterAddress.get();
+    }
+
+    @Override public String getHaMasterAddress() {
+        return this.masterHaAddress.get();
+    }
+
+    @Override public long getLastReadTimestamp() {
+        return this.lastReadTimestamp;
+    }
+
+    @Override public long getLastWriteTimestamp() {
+        return this.lastWriteTimestamp;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return this.currentState;
+    }
+
+    @Override public void changeCurrentState(HAConnectionState 
haConnectionState) {
+        LOGGER.info("change state to {}", haConnectionState);
+        this.currentState = haConnectionState;
+    }
+
+    public void closeMasterAndWait() {
+        this.closeMaster();
+        this.waitForRunning(1000 * 5);
+    }
+
+    @Override public void closeMaster() {
+        if (null != this.socketChannel) {
+            try {
+                SelectionKey sk = this.socketChannel.keyFor(this.selector);
+                if (sk != null) {
+                    sk.cancel();
+                }
+
+                this.socketChannel.close();
+                this.socketChannel = null;
+
+                LOGGER.info("AutoSwitchHAClient close connection with master 
{}", this.masterHaAddress.get());
+                this.changeCurrentState(HAConnectionState.READY);
+            } catch (IOException e) {
+                LOGGER.warn("CloseMaster exception. ", e);
+            }
+
+            this.lastReadTimestamp = 0;
+            this.processPosition = 0;
+
+            this.byteBufferRead.position(0);
+            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
+        }
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return this.flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        // Shutdown thread firstly
+        this.flowMonitor.shutdown();
+        super.shutdown();
+
+        closeMaster();
+        try {
+            this.selector.close();
+        } catch (IOException e) {
+            LOGGER.warn("Close the selector of AutoSwitchHAClient error, ", e);
+        }
+    }
+
+    private boolean isTimeToReportOffset() {
+        long interval = this.messageStore.now() - this.lastWriteTimestamp;
+        return interval > 
this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
+    }
+
+    private boolean sendHandshakeHeader() {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        
this.transferHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
+        if 
(this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile())
 {
+            this.transferHeaderBuffer.putLong(SYNC_FROM_LAST_FILE);
+        } else {
+            this.transferHeaderBuffer.putLong(SYNC_FROM_FIRST_FILE);
+        }
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, 
this.transferHeaderBuffer);
+    }
+
+    private void handshakeWithMaster() throws IOException {
+        sendHandshakeHeader();
+        boolean result = this.sendHandshakeHeader();
+        if (!result) {
+            closeMasterAndWait();
+        }
+
+        this.selector.select(5000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            closeMasterAndWait();
+            return;
+        }
+    }
+
+    private boolean reportSlaveOffset(final long offsetToReport) {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        this.transferHeaderBuffer.putInt(this.currentState.ordinal());
+        this.transferHeaderBuffer.putLong(offsetToReport);
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, 
this.transferHeaderBuffer);
+    }
+
+    private boolean reportSlaveMaxOffset() {
+        boolean result = true;
+        final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
+        if (maxPhyOffset > this.currentReportedOffset) {
+            this.currentReportedOffset = maxPhyOffset;
+            result = reportSlaveOffset(this.currentReportedOffset);
+        }
+        return result;
+    }
+
+    public boolean connectMaster() throws ClosedChannelException {
+        if (null == this.socketChannel) {
+            String addr = this.masterHaAddress.get();
+            if (addr != null) {
+                SocketAddress socketAddress = 
RemotingUtil.string2SocketAddress(addr);
+                this.socketChannel = RemotingUtil.connect(socketAddress);
+                if (this.socketChannel != null) {
+                    this.socketChannel.register(this.selector, 
SelectionKey.OP_READ);
+                    LOGGER.info("AutoSwitchHAClient connect to master {}", 
addr);
+                    changeCurrentState(HAConnectionState.HANDSHAKE);
+                }
+            }
+            this.currentReportedOffset = this.messageStore.getMaxPhyOffset();
+            this.lastReadTimestamp = System.currentTimeMillis();
+        }
+        return this.socketChannel != null;
+    }
+
+    private boolean transferFromMaster() throws IOException {
+        boolean result;
+        if (isTimeToReportOffset()) {
+            LOGGER.info("Slave report current offset {}", 
this.currentReportedOffset);
+            result = reportSlaveOffset(this.currentReportedOffset);
+            if (!result) {
+                return false;
+            }
+        }
+
+        this.selector.select(1000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            return false;
+        }
+
+        return this.reportSlaveMaxOffset();
+    }
+
+    @Override public void run() {
+        LOGGER.info(this.getServiceName() + " service started");
+
+        this.flowMonitor.start();
+        while (!this.isStopped()) {
+            try {
+                switch (this.currentState) {
+                    case SHUTDOWN:
+                        return;
+                    case READY:
+                        // Truncate invalid msg first
+                        final long truncateOffset = 
AutoSwitchHAClient.this.haService.truncateInvalidMsg();
+                        if (truncateOffset >= 0) {
+                            
AutoSwitchHAClient.this.epochCache.truncateSuffixByOffset(truncateOffset);
+                        }
+                        if (!connectMaster()) {
+                            LOGGER.warn("AutoSwitchHAClient connect to master 
{} failed", this.masterHaAddress.get());
+                            waitForRunning(1000 * 5);
+                        }
+                        continue;
+                    case HANDSHAKE:
+                        handshakeWithMaster();
+                        continue;
+                    case TRANSFER:
+                        if (!transferFromMaster()) {
+                            closeMasterAndWait();
+                            continue;
+                        }
+                        break;
+                    case SUSPEND:
+                    default:
+                        waitForRunning(1000 * 5);
+                        continue;
+                }
+                long interval = this.messageStore.now() - 
this.lastReadTimestamp;
+                if (interval > 
this.messageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
+                    LOGGER.warn("AutoSwitchHAClient, housekeeping, found this 
connection[" + this.masterHaAddress
+                        + "] expired, " + interval);
+                    closeMaster();
+                    LOGGER.warn("AutoSwitchHAClient, master not response some 
time, so close connection");
+                }
+            } catch (Exception e) {
+                LOGGER.warn(this.getServiceName() + " service has exception. 
", e);
+                closeMasterAndWait();
+            }
+        }
+
+    }
+
+    /**
+     * Compare the master and slave's epoch file, find consistent point, do 
truncate.
+     */
+    private boolean doTruncate(List<EpochEntry> masterEpochEntries, long 
masterEndOffset) {
+        if (this.epochCache.getEntrySize() == 0) {
+            // If epochMap is empty, means the broker is a new replicas
+            LOGGER.info("Slave local epochCache is empty, skip truncate log");
+            changeCurrentState(HAConnectionState.TRANSFER);
+            this.currentReportedOffset = 0;
+        } else {
+            final EpochFileCache masterEpochCache = new EpochFileCache();
+            masterEpochCache.initCacheFromEntries(masterEpochEntries);
+            masterEpochCache.setLastEpochEntryEndOffset(masterEndOffset);
+            final List<EpochEntry> localEpochEntries = 
this.epochCache.getAllEntries();
+            final EpochFileCache localEpochCache = new EpochFileCache();
+            localEpochCache.initCacheFromEntries(localEpochEntries);
+            
localEpochCache.setLastEpochEntryEndOffset(this.messageStore.getMaxPhyOffset());
+
+            final long truncateOffset = 
localEpochCache.findConsistentPoint(masterEpochCache);
+            if (truncateOffset < 0) {
+                // If truncateOffset < 0, means we can't find a consistent 
point
+                LOGGER.error("Failed to find a consistent point between 
masterEpoch:{} and slaveEpoch:{}", masterEpochEntries, localEpochEntries);
+                return false;
+            }
+            if (!this.messageStore.truncateFiles(truncateOffset)) {
+                LOGGER.error("Failed to truncate slave log to {}", 
truncateOffset);
+                return false;
+            }
+            this.epochCache.truncateSuffixByOffset(truncateOffset);
+            LOGGER.info("Truncate slave log to {} success, change to transfer 
state", truncateOffset);
+            changeCurrentState(HAConnectionState.TRANSFER);
+            this.currentReportedOffset = truncateOffset;
+        }
+        if (!reportSlaveMaxOffset()) {
+            LOGGER.error("AutoSwitchHAClient report max offset to master 
failed");
+            return false;
+        }
+        return true;
+    }
+
+    class HAClientReader extends AbstractHAReader {
+
+        @Override
+        protected boolean processReadResult(ByteBuffer byteBufferRead) {
+            int readSocketPos = byteBufferRead.position();
+
+            while (true) {
+                int diff = byteBufferRead.position() - 
AutoSwitchHAClient.this.processPosition;
+                if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) {
+                    int processPosition =  
AutoSwitchHAClient.this.processPosition;
+                    int masterState = byteBufferRead.getInt(processPosition);
+                    int bodySize = byteBufferRead.getInt(processPosition + 4);
+                    long masterOffset = byteBufferRead.getLong(processPosition 
+ 4 + 4);
+                    int masterEpoch = byteBufferRead.getInt(processPosition + 
4 + 4 + 8);
+                    long masterEpochStartOffset = 
byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4);
+                    long confirmOffset = 
byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4 + 8);
+
+                    if (masterState != 
AutoSwitchHAClient.this.currentState.ordinal()) {
+                        AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+                        AutoSwitchHAClient.this.waitForRunning(1);
+                        LOGGER.error("State not matched, masterState:{}, 
slaveState:{}, bodySize:{}, offset:{}, masterEpoch:{}, 
masterEpochStartOffset:{}, confirmOffset:{}",
+                            masterState, AutoSwitchHAClient.this.currentState, 
bodySize, masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset);
+                        return true;
+                    }
+                    LOGGER.info("Receive master msg, masterState:{}, 
bodySize:{}, offset:{}, masterEpoch:{}, masterEpochStartOffset:{}, 
confirmOffset:{}",
+                        HAConnectionState.values()[masterState], bodySize, 
masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset);
+
+                    if (diff >= (AutoSwitchHAConnection.MSG_HEADER_SIZE + 
bodySize)) {
+                        switch (AutoSwitchHAClient.this.currentState) {
+                            case HANDSHAKE:
+                                AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.MSG_HEADER_SIZE;
+                                // Truncate log
+                                int entrySize = 
AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
+                                final int entryNums = bodySize / entrySize;
+                                final ArrayList<EpochEntry> epochEntries = new 
ArrayList<>(entryNums);
+                                for (int i = 0; i < entryNums; i++) {
+                                    int epoch = 
byteBufferRead.getInt(AutoSwitchHAClient.this.processPosition + i * entrySize);
+                                    long startOffset = 
byteBufferRead.getLong(AutoSwitchHAClient.this.processPosition + i * entrySize 
+ 4);
+                                    epochEntries.add(new EpochEntry(epoch, 
startOffset));
+                                }
+                                byteBufferRead.position(readSocketPos);
+                                AutoSwitchHAClient.this.processPosition += 
bodySize;
+                                LOGGER.info("Receive handshake, 
masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, 
epochEntries);
+                                if (!doTruncate(epochEntries, masterOffset)) {
+                                    waitForRunning(1000 * 2);
+                                    LOGGER.error("AutoSwitchHAClient truncate 
log failed in handshake state");
+                                    return false;
+                                }
+                                break;
+                            case TRANSFER:
+                                byte[] bodyData = new byte[bodySize];
+                                
byteBufferRead.position(AutoSwitchHAClient.this.processPosition + 
AutoSwitchHAConnection.MSG_HEADER_SIZE);
+                                byteBufferRead.get(bodyData);
+                                byteBufferRead.position(readSocketPos);
+                                AutoSwitchHAClient.this.processPosition += 
AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+
+                                long slavePhyOffset = 
AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
+                                if (slavePhyOffset != 0) {
+                                    if (slavePhyOffset != masterOffset) {
+                                        LOGGER.error("master pushed offset not 
equal the max phy offset in slave, SLAVE: "
+                                            + slavePhyOffset + " MASTER: " + 
masterOffset);
+                                        return false;
+                                    }
+                                }
+
+                                // If epoch changed
+                                if (masterEpoch != 
AutoSwitchHAClient.this.currentReceivedEpoch) {
+                                    
AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch;
+                                    
AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch, 
masterEpochStartOffset));
+                                }
+                                AutoSwitchHAClient.this.confirmOffset = 
Math.min(confirmOffset, messageStore.getMaxPhyOffset());
+
+                                if (bodySize > 0) {
+                                    final DefaultMessageStore messageStore = 
AutoSwitchHAClient.this.messageStore;
+                                    if 
(messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length)) {
+                                        LOGGER.info("Slave append master log 
success, from {}, size {}, epoch:{}", masterOffset, bodySize, masterEpoch);
+                                    }

Review Comment:
   这里不建议打日志,这个操作太频繁了,会产生大量垃圾日志



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to