RongtongJin commented on code in PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#discussion_r863946877


##########
common/src/main/java/org/apache/rocketmq/common/EpochEntry.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+public class EpochEntry {
+
+    private final int epoch;
+    private final long startOffset;
+    private long endOffset = Long.MAX_VALUE;

Review Comment:
   这里也需要变成-1,另外是不是这里变成-1后,下面就不需要去判断是不是最后一个了



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCache.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.CheckpointFile;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+/**
+ * Cache for epochFile.
+ * Mapping (Epoch -> StartOffset)
+ */
+public class EpochFileCache {
+    private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    private final Lock readLock = this.readWriteLock.readLock();
+    private final Lock writeLock = this.readWriteLock.writeLock();
+    private final TreeMap<Integer, EpochEntry> epochMap;
+    private CheckpointFile<EpochEntry> checkpoint;
+
+    public EpochFileCache() {
+        this.epochMap = new TreeMap<>();
+    }
+
+    public EpochFileCache(final String path) {
+        this.epochMap = new TreeMap<>();
+        this.checkpoint = new CheckpointFile<>(path, new 
EpochEntrySerializer());
+    }
+
+    public boolean initCacheFromFile() {
+        this.writeLock.lock();
+        try {
+            final List<EpochEntry> entries = this.checkpoint.read();
+            initEntries(entries);
+            return true;
+        } catch (final IOException e) {
+            log.error("Error happen when init epoch entries from epochFile", 
e);
+            return false;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public void initCacheFromEntries(final List<EpochEntry> entries) {
+        this.writeLock.lock();
+        try {
+            initEntries(entries);
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void initEntries(final List<EpochEntry> entries) {
+        this.epochMap.clear();
+        EpochEntry preEntry = null;
+        for (final EpochEntry entry : entries) {
+            this.epochMap.put(entry.getEpoch(), entry);
+            if (preEntry != null) {
+                preEntry.setEndOffset(entry.getStartOffset());
+            }
+            preEntry = entry;
+        }
+    }
+
+    public boolean appendEntry(final EpochEntry entry) {
+        this.writeLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                final EpochEntry lastEntry = 
this.epochMap.lastEntry().getValue();
+                if (lastEntry.getEpoch() >= entry.getEpoch() || 
lastEntry.getStartOffset() >= entry.getStartOffset()) {
+                    log.error("The appending entry's lastEpoch or endOffset {} 
is not bigger than lastEntry {}, append failed", entry, lastEntry);
+                    return false;
+                }
+                lastEntry.setEndOffset(entry.getStartOffset());
+            }
+            this.epochMap.put(entry.getEpoch(), new EpochEntry(entry));
+            flush();
+            return true;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    /**
+     * Set endOffset for lastEpochEntry.
+     */
+    public void setLastEpochEntryEndOffset(final long endOffset) {
+        this.writeLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                final EpochEntry lastEntry = 
this.epochMap.lastEntry().getValue();
+                if (lastEntry.getStartOffset() <= endOffset) {
+                    lastEntry.setEndOffset(endOffset);
+                }
+            }
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public EpochEntry firstEntry() {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.isEmpty()) {
+                return null;
+            }
+            return new EpochEntry(this.epochMap.firstEntry().getValue());
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry lastEntry() {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.isEmpty()) {
+                return null;
+            }
+            return new EpochEntry(this.epochMap.lastEntry().getValue());
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public int lastEpoch() {
+        final EpochEntry entry = lastEntry();
+        if (entry != null) {
+            return entry.getEpoch();
+        }
+        return -1;
+    }
+
+    public EpochEntry getEntry(final int epoch) {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.containsKey(epoch)) {
+                final EpochEntry entry = this.epochMap.get(epoch);
+                return new EpochEntry(entry);
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry findEpochEntryByOffset(final long offset) {
+        this.readLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                for (Map.Entry<Integer, EpochEntry> entry : 
this.epochMap.entrySet()) {
+                    if (entry.getValue().getStartOffset() <= offset && 
entry.getValue().getEndOffset() > offset) {
+                        return new EpochEntry(entry.getValue());
+                    }
+                }
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry nextEntry(final int epoch) {
+        this.readLock.lock();
+        try {
+            final Map.Entry<Integer, EpochEntry> entry = 
this.epochMap.ceilingEntry(epoch + 1);
+            if (entry != null) {
+                return new EpochEntry(entry.getValue());
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public List<EpochEntry> getAllEntries() {
+        this.readLock.lock();
+        try {
+            final ArrayList<EpochEntry> result = new 
ArrayList<>(this.epochMap.size());
+            this.epochMap.forEach((key, value) -> result.add(new 
EpochEntry(value)));
+            return result;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    /**
+     * Find the consistentPoint between compareCache and local.
+     *
+     * @return the consistent offset
+     */
+    public long findConsistentPoint(final EpochFileCache compareCache) {
+        this.readLock.lock();
+        try {
+            long consistentOffset = -1;
+            final Map<Integer, EpochEntry> descendingMap = new 
TreeMap<>(this.epochMap).descendingMap();
+            final Iterator<Map.Entry<Integer, EpochEntry>> iter = 
descendingMap.entrySet().iterator();
+            while (iter.hasNext()) {
+                final Map.Entry<Integer, EpochEntry> curLocalEntry = 
iter.next();
+                final EpochEntry compareEntry = 
compareCache.getEntry(curLocalEntry.getKey());
+                if (compareEntry != null && compareEntry.getStartOffset() == 
curLocalEntry.getValue().getStartOffset()) {
+                    consistentOffset = 
Math.min(curLocalEntry.getValue().getEndOffset(), compareEntry.getEndOffset());
+                    break;
+                }
+            }
+            return consistentOffset;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    /**
+     * Remove epochEntries with epoch >= truncateEpoch.
+     */
+    public void truncateFromEpoch(final int truncateEpoch) {
+        doTruncate((entry) -> entry.getEpoch() >= truncateEpoch);
+    }
+
+    /**
+     * Remove epochEntries with startOffset >= truncateOffset.
+     */
+    public void truncateFromOffset(final long truncateOffset) {
+        doTruncate((entry) -> entry.getStartOffset() >= truncateOffset);
+    }
+
+    /**
+     * Clear all epochEntries
+     */
+    public void clearAll() {
+        doTruncate((entry) -> true);
+    }
+
+    private void doTruncate(final Predicate<EpochEntry> predict) {
+        this.writeLock.lock();
+        try {
+            this.epochMap.entrySet().removeIf(entry -> 
predict.test(entry.getValue()));
+            final EpochEntry entry = lastEntry();
+            if (entry != null) {
+                entry.setEndOffset(Long.MAX_VALUE);
+            }
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void flush() {
+        this.writeLock.lock();
+        try {
+            try {
+                if (this.checkpoint != null) {
+                    final ArrayList<EpochEntry> entries = new 
ArrayList<>(this.epochMap.values());
+                    this.checkpoint.write(entries);
+                }
+            } catch (final IOException e) {
+                log.error("Error happen when flush epochEntries to 
epochCheckpointFile", e);
+            }
+        } finally {
+            this.writeLock.unlock();
+        }

Review Comment:
   这里用一个try catch即可,不需要两个



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAConnection implements HAConnection {
+    /**
+     * Header protocol in syncing msg from master.
+     * Format: current state + body size + offset + epoch + 
additionalInfo(confirmOffset).
+     * If the msg is hankeShakeMsg, the body size = EpochEntrySize * 
EpochEntryNums, the offset is maxOffset in master.
+     */
+    public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8;
+    public static final int EPOCH_ENTRY_SIZE = 12;
+    private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final AutoSwitchHAService haService;
+    private final SocketChannel socketChannel;
+    private final String clientAddress;
+    private final EpochFileCache epochCache;
+    private final AbstractWriteSocketService writeSocketService;
+    private final ReadSocketService readSocketService;
+    private volatile HAConnectionState currentState = 
HAConnectionState.HANDSHAKE;
+    private volatile long slaveRequestOffset = -1;
+    private volatile long slaveAckOffset = -1;
+    /**
+     * Whether the slave have already sent a handshake message
+     */
+    private volatile boolean isSlaveSendHandshake = false;
+    private volatile int currentTransferEpoch = -1;
+    private volatile long currentTransferEpochEndOffset = 0;
+    private final FlowMonitor flowMonitor;
+
+    public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel 
socketChannel,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.socketChannel = socketChannel;
+        this.epochCache = epochCache;
+        this.clientAddress = 
this.socketChannel.socket().getRemoteSocketAddress().toString();
+        this.socketChannel.configureBlocking(false);
+        this.socketChannel.socket().setSoLinger(false, -1);
+        this.socketChannel.socket().setTcpNoDelay(true);
+        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        this.writeSocketService = new WriteSocketService(this.socketChannel);
+        this.readSocketService = new ReadSocketService(this.socketChannel);
+        this.haService.getConnectionCount().incrementAndGet();
+        this.flowMonitor = new 
FlowMonitor(haService.getDefaultMessageStore().getMessageStoreConfig());
+    }
+
+    @Override public void start() {
+        changeCurrentState(HAConnectionState.HANDSHAKE);
+        this.flowMonitor.start();
+        this.readSocketService.start();
+        this.writeSocketService.start();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        this.flowMonitor.shutdown(true);
+        this.writeSocketService.shutdown(true);
+        this.readSocketService.shutdown(true);
+        this.close();
+    }
+
+    @Override public void close() {
+        if (this.socketChannel != null) {
+            try {
+                this.socketChannel.close();
+            } catch (final IOException e) {
+                LOGGER.error("", e);
+            }
+        }
+    }
+
+    public void changeCurrentState(HAConnectionState connectionState) {
+        LOGGER.info("change state to {}", connectionState);
+        this.currentState = connectionState;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return currentState;
+    }
+
+    @Override public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override public String getClientAddress() {
+        return clientAddress;
+    }
+
+    @Override public long getSlaveAckOffset() {
+        return slaveAckOffset;
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public long getTransferFromWhere() {
+        return this.writeSocketService.getNextTransferFromWhere();
+    }
+
+    private void changeTransferEpochToNext(final EpochEntry entry) {
+        this.currentTransferEpoch = entry.getEpoch();
+        this.currentTransferEpochEndOffset = entry.getEndOffset();
+        if (entry.getEpoch() == this.epochCache.lastEpoch()) {
+            // Use -1 to stand for +∞
+            this.currentTransferEpochEndOffset = -1;
+        }
+    }
+
+    class ReadSocketService extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+        private final ByteBuffer byteBufferRead = 
ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private final AbstractHAReader haReader;
+        private int processPosition = 0;
+        private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+        public ReadSocketService(final SocketChannel socketChannel) throws 
IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+            this.setDaemon(true);
+            haReader = new HAServerReader();
+            haReader.registerHook(readSize -> {
+                if (readSize > 0) {
+                    ReadSocketService.this.lastReadTimestamp =
+                        
haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        @Override
+        public void run() {
+            LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    boolean ok = this.haReader.read(this.socketChannel, 
this.byteBufferRead);
+                    if (!ok) {
+                        AutoSwitchHAConnection.LOGGER.error("processReadEvent 
error");
+                        break;
+                    }
+
+                    long interval = 
haService.getDefaultMessageStore().getSystemClock().now() - 
this.lastReadTimestamp;
+                    if (interval > 
haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval())
 {
+                        LOGGER.warn("ha housekeeping, found this connection[" 
+ clientAddress + "] expired, " + interval);
+                        break;
+                    }
+                } catch (Exception e) {
+                    AutoSwitchHAConnection.LOGGER.error(this.getServiceName() 
+ " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.makeStop();
+
+            changeCurrentState(HAConnectionState.SHUTDOWN);
+
+            writeSocketService.makeStop();
+
+            haService.removeConnection(AutoSwitchHAConnection.this);
+
+            haService.getConnectionCount().decrementAndGet();
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                AutoSwitchHAConnection.LOGGER.error("", e);
+            }
+
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " 
service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            return ReadSocketService.class.getSimpleName();
+        }
+
+        class HAServerReader extends AbstractHAReader {
+            @Override
+            protected boolean processReadResult(ByteBuffer byteBufferRead) {
+                while (true) {
+                    int diff = byteBufferRead.position() - 
ReadSocketService.this.processPosition;
+                    if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
+                        int readPosition = 
ReadSocketService.this.processPosition;
+                        HAConnectionState slaveState = 
HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
+
+                        switch (slaveState) {
+                            case HANDSHAKE:
+                                isSlaveSendHandshake = true;
+                                ReadSocketService.this.processPosition += 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+                                break;
+                            case TRANSFER:
+                                long slaveMaxOffset = 
byteBufferRead.getLong(readPosition + 4);
+                                ReadSocketService.this.processPosition += 
AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+
+                                slaveAckOffset = slaveMaxOffset;
+                                if (slaveRequestOffset < 0) {
+                                    slaveRequestOffset = slaveMaxOffset;
+                                }
+                                LOGGER.info("slave[" + clientAddress + "] 
request offset " + slaveMaxOffset);
+                                
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                break;
+                            default:
+                                LOGGER.error("Current state illegal {}", 
currentState);
+                                break;
+                        }
+
+                        if (!slaveState.equals(currentState)) {
+                            LOGGER.warn("Master change state from {} to {}", 
currentState, slaveState);
+                            changeCurrentState(slaveState);
+                        }
+                        continue;
+                    }
+
+                    if (!byteBufferRead.hasRemaining()) {
+                        
byteBufferRead.position(ReadSocketService.this.processPosition);
+                        byteBufferRead.compact();
+                        ReadSocketService.this.processPosition = 0;
+                    }
+                    break;
+                }
+
+                return true;
+            }
+        }
+    }
+
+    class WriteSocketService extends AbstractWriteSocketService {
+        private SelectMappedBufferResult selectMappedBufferResult;
+
+        public WriteSocketService(final SocketChannel socketChannel) throws 
IOException {
+            super(socketChannel);
+        }
+
+        @Override
+        protected int getNextTransferDataSize() {
+            SelectMappedBufferResult selectResult = 
haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+            if (selectResult == null || selectResult.getSize() <= 0) {
+                return 0;
+            }
+            this.selectMappedBufferResult = selectResult;
+            return selectResult.getSize();
+        }
+
+        @Override
+        protected void releaseData() {
+            this.selectMappedBufferResult.release();
+            this.selectMappedBufferResult = null;
+        }
+
+        @Override
+        protected boolean transferData(int maxTransferSize) {
+
+            if (null != this.selectMappedBufferResult && maxTransferSize >= 0) 
{
+                
this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
+            }
+
+            // Write Header
+            boolean result = haWriter.write(this.socketChannel, 
this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            if (null == this.selectMappedBufferResult) {
+                return true;
+            }
+
+            // Write Body
+            result = haWriter.write(this.socketChannel, 
this.selectMappedBufferResult.getByteBuffer());
+
+            if (result) {
+                releaseData();
+            }
+            return result;
+        }
+
+        @Override
+        protected void onStop() {
+            if (this.selectMappedBufferResult != null) {
+                this.selectMappedBufferResult.release();
+            }
+        }
+
+        @Override
+        public String getServiceName() {
+            if 
(haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+                return 
haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + 
WriteSocketService.class.getSimpleName();
+            }
+            return WriteSocketService.class.getSimpleName();
+        }
+    }
+
+    abstract class AbstractWriteSocketService extends ServiceThread {
+        protected final Selector selector;
+        protected final SocketChannel socketChannel;
+        protected final HAWriter haWriter;
+
+        protected final ByteBuffer byteBufferHeader = 
ByteBuffer.allocate(MSG_HEADER_SIZE);
+        // Store master epochFileCache: (Epoch + startOffset) * 1000
+        private final ByteBuffer handShakeBuffer = 
ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
+        protected long nextTransferFromWhere = -1;
+        protected boolean lastWriteOver = true;
+        protected long lastWriteTimestamp = System.currentTimeMillis();
+        protected long lastPrintTimestamp = System.currentTimeMillis();
+        protected long transferOffset = 0;
+
+        public AbstractWriteSocketService(final SocketChannel socketChannel) 
throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+            this.setDaemon(true);
+            haWriter = new HAWriter();
+            haWriter.registerHook(writeSize -> {
+                flowMonitor.addByteCountTransferred(writeSize);
+                if (writeSize > 0) {
+                    AbstractWriteSocketService.this.lastWriteTimestamp =
+                        
haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        public long getNextTransferFromWhere() {
+            return this.nextTransferFromWhere;
+        }
+
+        // Handle shake method
+        private boolean buildHandshakeBuffer() {
+            final List<EpochEntry> epochEntries = 
AutoSwitchHAConnection.this.epochCache.getAllEntries();
+            final int lastEpoch = 
AutoSwitchHAConnection.this.epochCache.lastEpoch();
+            final long maxPhyOffset = 
AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(epochEntries.size() * 
EPOCH_ENTRY_SIZE);
+            // Offset
+            this.byteBufferHeader.putLong(maxPhyOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(lastEpoch);
+            // Additional info
+            this.byteBufferHeader.putLong(0L);
+            this.byteBufferHeader.flip();
+
+            // EpochEntries
+            this.handShakeBuffer.position(0);
+            this.handShakeBuffer.limit(EPOCH_ENTRY_SIZE * epochEntries.size());
+            for (final EpochEntry entry : epochEntries) {
+                if (entry != null) {
+                    this.handShakeBuffer.putInt(entry.getEpoch());
+                    this.handShakeBuffer.putLong(entry.getStartOffset());
+                }
+            }
+            this.handShakeBuffer.flip();
+            LOGGER.info("Master build handshake header: maxEpoch:{}, 
maxOffset:{}, epochEntries:{}", lastEpoch, maxPhyOffset, epochEntries);
+            return true;
+        }
+
+        private boolean handshakeWithSlave() {
+            // Write Header
+            boolean result = this.haWriter.write(this.socketChannel, 
this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            // Write Body
+            return this.haWriter.write(this.socketChannel, 
this.handShakeBuffer);
+        }
+
+        // Normal transfer method
+        private void buildTransferHeaderBuffer(long nextOffset, int bodySize) {
+            // Build Header
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(bodySize);
+            // Offset
+            this.byteBufferHeader.putLong(nextOffset);
+            // Epoch
+            
this.byteBufferHeader.putInt(AutoSwitchHAConnection.this.currentTransferEpoch);
+            // Additional info(confirm offset)
+            final long confirmOffset = 
AutoSwitchHAConnection.this.haService.getConfirmOffset();
+            this.byteBufferHeader.putLong(confirmOffset);
+            this.byteBufferHeader.flip();
+            LOGGER.info("Master send msg, state:{}, size:{}, offset:{}, 
epoch:{}, confirmOffset:{}",
+                currentState, bodySize, nextOffset, 
AutoSwitchHAConnection.this.currentTransferEpoch, confirmOffset);
+        }
+
+        private void transferToSlave() throws Exception {
+            if (this.lastWriteOver) {
+                long interval =
+                    haService.getDefaultMessageStore().getSystemClock().now() 
- this.lastWriteTimestamp;
+
+                if (interval > 
haService.getDefaultMessageStore().getMessageStoreConfig()
+                    .getHaSendHeartbeatInterval()) {
+
+                    buildTransferHeaderBuffer(this.nextTransferFromWhere, 0);
+
+                    this.lastWriteOver = this.transferData(0);
+                    if (!this.lastWriteOver) {
+                        return;
+                    }
+                }
+            } else {
+                // maxTransferSize == -1 means to continue transfer remaining 
data.
+                this.lastWriteOver = this.transferData(-1);
+                if (!this.lastWriteOver) {
+                    return;
+                }
+            }
+
+            int size = this.getNextTransferDataSize();
+            if (size > 0) {
+                if (size > 
haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize())
 {
+                    size = 
haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
+                }
+                int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
+                if (size > canTransferMaxBytes) {
+                    if (System.currentTimeMillis() - lastPrintTimestamp > 
1000) {
+                        LOGGER.warn("Trigger HA flow control, max transfer 
speed {}KB/s, current speed: {}KB/s",
+                            String.format("%.2f", 
flowMonitor.maxTransferByteInSecond() / 1024.0),
+                            String.format("%.2f", 
flowMonitor.getTransferredByteInSecond() / 1024.0));
+                        lastPrintTimestamp = System.currentTimeMillis();
+                    }
+                    size = canTransferMaxBytes;
+                }
+                if (size <= 0) {
+                    this.releaseData();
+                    this.waitForRunning(100);
+                    return;
+                }
+
+                // We must ensure that the transmitted logs are within the 
same epoch
+                // If currentEpochEndOffset == -1, means that 
currentTransferEpoch = last epoch, so the endOffset = +∞.
+                final long currentEpochEndOffset = 
AutoSwitchHAConnection.this.currentTransferEpochEndOffset;
+                if (currentEpochEndOffset != -1 && this.nextTransferFromWhere 
+ size > currentEpochEndOffset) {
+                    final EpochEntry epochEntry = 
AutoSwitchHAConnection.this.epochCache.nextEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+                    if (epochEntry == null) {
+                        LOGGER.error("Can't find a bigger epochEntry than 
epoch {}", AutoSwitchHAConnection.this.currentTransferEpoch);
+                        waitForRunning(100);
+                        return;
+                    }
+                    size = (int) (currentEpochEndOffset - 
this.nextTransferFromWhere);
+                    changeTransferEpochToNext(epochEntry);
+                }

Review Comment:
   
这里可能需要注意,有不有可能master和slave连接没有断开,但master的epoch增加了(一般情况下不会出现,但有可能人工运维造成的),此时currentEpochEndOffset仍然是上一个epoch所以是-1,导致slave的数据epoch
 map出现问题。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to