Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/628#discussion_r239578960 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java --- @@ -0,0 +1,514 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ZKDatabase; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used by Followers to host Observers. This reduces the network load on the Leader process by pushing + * the responsibility for keeping Observers in sync off the leading peer. + * + * It is expected that Observers will continue to perform the initial vetting of clients and requests. + * Observers send the request to the follower where it is received by an ObserverMaster. + * + * The ObserverMaster forwards a copy of the request to the ensemble Leader and inserts it into its own + * request processor pipeline where it can be matched with the response comes back. All commits received + * from the Leader will be forwarded along to every Learner connected to the ObserverMaster. + * + * New Learners connecting to a Follower will receive a LearnerHandler object and be party to its syncing logic + * to be brought up to date. + * + * The logic is quite a bit simpler than the corresponding logic in Leader because it only hosts observers. + */ +public class ObserverMaster implements LearnerMaster, Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ObserverMaster.class); + + //Follower counter + private final AtomicLong followerCounter = new AtomicLong(-1); + + private QuorumPeer self; + private FollowerZooKeeperServer zks; + private int port; + + private Set<LearnerHandler> activeObservers = Collections.newSetFromMap(new ConcurrentHashMap<LearnerHandler,Boolean>()); + + private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> connectionBeans = new ConcurrentHashMap<>(); + + /** + * we want to keep a log of past txns so that observers can sync up with us when we connect, + * but we can't keep everything in memory, so this limits how much memory will be dedicated + * to keeping recent txns. + */ + private final static int PKTS_SIZE_LIMIT = 32 * 1024 * 1024; + private static volatile int pktsSizeLimit = Integer.getInteger("zookeeper.observerMaster.sizeLimit", PKTS_SIZE_LIMIT); + private ConcurrentLinkedQueue<QuorumPacket> proposedPkts = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue<QuorumPacket> committedPkts = new ConcurrentLinkedQueue<>(); + private int pktsSize = 0; + + private long lastProposedZxid; + + // ensure ordering of revalidations returned to this learner + private final Object revalidateSessionLock = new Object(); + + // Throttle when there are too many concurrent snapshots being sent to observers + private static final String MAX_CONCURRENT_SNAPSHOTS = "zookeeper.leader.maxConcurrentSnapshots"; + private static final int maxConcurrentSnapshots; + + private static final String MAX_CONCURRENT_DIFFS = "zookeeper.leader.maxConcurrentDiffs"; + private static final int maxConcurrentDiffs; + static { + maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 10); + LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots); + + maxConcurrentDiffs = Integer.getInteger(MAX_CONCURRENT_DIFFS, 100); + LOG.info(MAX_CONCURRENT_DIFFS + " = " + maxConcurrentDiffs); + } + + private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations = new ConcurrentLinkedQueue<>(); + static class Revalidation { + public final long sessionId; + public final int timeout; + public final LearnerHandler handler; + + Revalidation(final Long sessionId, final int timeout, final LearnerHandler handler) { + this.sessionId = sessionId; + this.timeout = timeout; + this.handler = handler; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final Revalidation that = (Revalidation) o; + return sessionId == that.sessionId && timeout == that.timeout && handler.equals(that.handler); + } + + @Override + public int hashCode() { + int result = (int) (sessionId ^ (sessionId >>> 32)); + result = 31 * result + timeout; + result = 31 * result + handler.hashCode(); + return result; + } + } + + private final LearnerSnapshotThrottler learnerSnapshotThrottler = + new LearnerSnapshotThrottler(maxConcurrentSnapshots); + + private Thread thread; + private ServerSocket ss; + private boolean listenerRunning; + private ScheduledExecutorService pinger; + + Runnable ping = new Runnable() { + @Override + public void run() { + for (LearnerHandler lh: activeObservers) { + lh.ping(); + } + } + }; + + ObserverMaster(QuorumPeer self, FollowerZooKeeperServer zks, int port) { + this.self = self; + this.zks = zks; + this.port = port; + } + + @Override + public void addLearnerHandler(LearnerHandler learnerHandler) { + if (!listenerRunning) { + throw new RuntimeException(("ObserverMaster is not running")); + } + } + + @Override + public void removeLearnerHandler(LearnerHandler learnerHandler) { + activeObservers.remove(learnerHandler); + } + + @Override + public int syncTimeout() { + return self.getSyncLimit() * self.getTickTime(); + } + + @Override + public int getTickOfNextAckDeadline() { + return self.tick.get() + self.syncLimit; + } + + @Override + public int getTickOfInitialAckDeadline() { + return self.tick.get() + self.initLimit + self.syncLimit; + } + + @Override + public long getAndDecrementFollowerCounter() { + return followerCounter.getAndDecrement(); + } + + @Override + public void waitForEpochAck(long sid, StateSummary ss) throws IOException, InterruptedException { + // since this is done by an active follower, we don't need to wait for anything + } + + @Override + public LearnerSnapshotThrottler getLearnerSnapshotThrottler() { + return learnerSnapshotThrottler; + } + + @Override + public void waitForStartup() throws InterruptedException { + // since this is done by an active follower, we don't need to wait for anything + } + + @Override + synchronized public long getLastProposed() { + return lastProposedZxid; + } + + @Override + public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { + return self.getCurrentEpoch(); + } + + @Override + public ZKDatabase getZKDatabase() { + return zks.getZKDatabase(); + } + + @Override + public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException { + // no need to wait since we are a follower + } + + @Override + public int getCurrentTick() { + return self.tick.get(); + } + + @Override + public void processAck(long sid, long zxid, SocketAddress localSocketAddress) { + if ((zxid & 0xffffffffL) == 0) { + /* + * We no longer process NEWLEADER ack by this method. However, + * the learner sends ack back to the leader after it gets UPTODATE + * so we just ignore the message. + */ + return; + } + + throw new RuntimeException("Observers shouldn't send ACKS ack = " + Long.toHexString(zxid)); + } + + @Override + public void touch(long sess, int to) { + zks.getSessionTracker().touchSession(sess, to); + } + + boolean revalidateLearnerSession(QuorumPacket qp) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); + DataInputStream dis = new DataInputStream(bis); + long id = dis.readLong(); + boolean valid = dis.readBoolean(); + Iterator<Revalidation> itr = pendingRevalidations.iterator(); + if (!itr.hasNext()) { + // not a learner session, handle locally + return false; + } + Revalidation revalidation = itr.next(); + if (revalidation.sessionId != id) { + // not a learner session, handle locally + return false; + } + itr.remove(); + LearnerHandler learnerHandler = revalidation.handler; + // create a copy here as the qp object is reused by the Follower and may be mutated + QuorumPacket deepCopy = new QuorumPacket(qp.getType(), qp.getZxid(), + Arrays.copyOf(qp.getData(), qp.getData().length), + qp.getAuthinfo() == null ? null : new ArrayList<>(qp.getAuthinfo())); + learnerHandler.queuePacket(deepCopy); + // To keep consistent as leader, touch the session when it's + // revalidating the session, only update if it's a valid session. + if (valid) { + touch(revalidation.sessionId, revalidation.timeout); + } + return true; + } + + @Override + public void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); + DataInputStream dis = new DataInputStream(bis); + long id = dis.readLong(); + int to = dis.readInt(); + synchronized (revalidateSessionLock) { + pendingRevalidations.add(new Revalidation(id, to, learnerHandler)); + Learner learner = zks.getLearner(); + if (learner != null) { + learner.writePacket(qp, true); + } + } + } + + @Override + public void submitLearnerRequest(Request si) { + zks.processObserverRequest(si); + } + + @Override + synchronized public long startForwarding(LearnerHandler learnerHandler, long lastSeenZxid) { + Iterator<QuorumPacket> itr = committedPkts.iterator(); + if (itr.hasNext()) { + QuorumPacket packet = itr.next(); + if (packet.getZxid() > lastSeenZxid + 1) { + LOG.error("LearnerHandler is too far behind ({} < {}), disconnecting {} at {}", Long.toHexString(lastSeenZxid + 1), + Long.toHexString(packet.getZxid()), learnerHandler.getSid(), learnerHandler.getRemoteAddress()); + learnerHandler.shutdown(); + return -1; + } else if (packet.getZxid() == lastSeenZxid + 1) { + learnerHandler.queuePacket(packet); + } + long queueHeadZxid = packet.getZxid(); + long queueBytesUsed = LearnerHandler.packetSize(packet); + while (itr.hasNext()) { + packet = itr.next(); + if (packet.getZxid() <= lastSeenZxid) { + continue; + } + learnerHandler.queuePacket(packet); + queueBytesUsed += LearnerHandler.packetSize(packet); + } + LOG.info("finished syncing observer from retained commit queue: sid {}, " + + "queue head 0x{}, queue tail 0x{}, sync position 0x{}, num packets used {}, " + + "num bytes used {}", + learnerHandler.getSid(), + Long.toHexString(queueHeadZxid), + Long.toHexString(packet.getZxid()), + Long.toHexString(lastSeenZxid), + packet.getZxid() - lastSeenZxid, + queueBytesUsed); + } + activeObservers.add(learnerHandler); + return lastProposedZxid; + } + + @Override + public long getQuorumVerifierVersion() { + return self.getQuorumVerifier().getVersion(); + } + + @Override + public String getPeerInfo(long sid) { + QuorumPeer.QuorumServer server = self.getView().get(sid); + return server == null ? "" : server.toString(); + } + + @Override + public byte[] getQuorumVerifierBytes() { + return self.getLastSeenQuorumVerifier().toString().getBytes(); + } + + @Override + public QuorumAuthServer getQuorumAuthServer() { + return (self == null) ? null : self.authServer; + } + + void proposalReceived(QuorumPacket qp) { + proposedPkts.add(new QuorumPacket(Leader.INFORM, qp.getZxid(), qp.getData(), null)); + } + + private synchronized QuorumPacket removeProposedPacket(long zxid) { + QuorumPacket pkt = proposedPkts.peek(); + if (pkt == null || pkt.getZxid() > zxid) { + LOG.debug("ignore missing proposal packet for {}", Long.toHexString(zxid)); + return null; + } + if (pkt.getZxid() != zxid) { + final String m = String.format("Unexpected proposal packet on commit ack, expected zxid 0x%d got zxid 0x%d", + zxid, pkt.getZxid()); + LOG.error(m); + throw new RuntimeException(m); + } + proposedPkts.remove(); + return pkt; + } + + private synchronized void cacheCommittedPacket(final QuorumPacket pkt) { + committedPkts.add(pkt); + pktsSize += LearnerHandler.packetSize(pkt); + // remove 5 packets for every one added as we near the size limit + for (int i = 0; pktsSize > pktsSizeLimit * 0.8 && i < 5; i++) { + QuorumPacket oldPkt = committedPkts.poll(); + if (oldPkt == null) { + pktsSize = 0; + break; + } + pktsSize -= LearnerHandler.packetSize(oldPkt); + } + // enforce the size limit as a hard cap + while (pktsSize > pktsSizeLimit) { + QuorumPacket oldPkt = committedPkts.poll(); + if (oldPkt == null) { + pktsSize = 0; + break; + } + pktsSize -= LearnerHandler.packetSize(oldPkt); + } + } + + private synchronized void sendPacket(final QuorumPacket pkt) { + for (LearnerHandler lh: activeObservers) { + lh.queuePacket(pkt); + } + lastProposedZxid = pkt.getZxid(); + } + + synchronized void proposalCommitted(long zxid) { + QuorumPacket pkt = removeProposedPacket(zxid); + if (pkt == null) { + return; + } + cacheCommittedPacket(pkt); + sendPacket(pkt); + } + + synchronized void informAndActivate(long zxid, long suggestedLeaderId) { + QuorumPacket pkt = removeProposedPacket(zxid); + if (pkt == null) { + return; + } + + // Build the INFORMANDACTIVATE packet + QuorumPacket informAndActivateQP = Leader.buildInformAndActivePacket( + zxid, suggestedLeaderId, pkt.getData()); + cacheCommittedPacket(informAndActivateQP); + sendPacket(informAndActivateQP); + } + + synchronized public void start() throws IOException { + if (thread != null && thread.isAlive()) { + return; + } + listenerRunning = true; + if (self.getQuorumListenOnAllIPs()) { + ss = new ServerSocket(port, 10 /* dog science */); + } else { + ss = new ServerSocket(port, 10 /* dog science */, self.getQuorumAddress().getAddress()); + } + thread = new Thread(this, "ObserverMaster"); + thread.start(); + pinger = Executors.newSingleThreadScheduledExecutor(); + pinger.scheduleAtFixedRate(ping, self.tickTime /2, self.tickTime/2, TimeUnit.MILLISECONDS); + } + + public void run() { + while (listenerRunning) { + try { + Socket s = ss.accept(); --- End diff -- One more thing, we added a recent change here to setSoTimeout to initLimit * tickTime to align with what leader is doing. We should add it here as well.
---