Github user lvfangmin commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/628#discussion_r239578960
--- Diff:
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
---
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZKDatabase;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used by Followers to host Observers. This reduces the network load on
the Leader process by pushing
+ * the responsibility for keeping Observers in sync off the leading peer.
+ *
+ * It is expected that Observers will continue to perform the initial
vetting of clients and requests.
+ * Observers send the request to the follower where it is received by an
ObserverMaster.
+ *
+ * The ObserverMaster forwards a copy of the request to the ensemble
Leader and inserts it into its own
+ * request processor pipeline where it can be matched with the response
comes back. All commits received
+ * from the Leader will be forwarded along to every Learner connected to
the ObserverMaster.
+ *
+ * New Learners connecting to a Follower will receive a LearnerHandler
object and be party to its syncing logic
+ * to be brought up to date.
+ *
+ * The logic is quite a bit simpler than the corresponding logic in Leader
because it only hosts observers.
+ */
+public class ObserverMaster implements LearnerMaster, Runnable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ObserverMaster.class);
+
+ //Follower counter
+ private final AtomicLong followerCounter = new AtomicLong(-1);
+
+ private QuorumPeer self;
+ private FollowerZooKeeperServer zks;
+ private int port;
+
+ private Set<LearnerHandler> activeObservers =
Collections.newSetFromMap(new ConcurrentHashMap<LearnerHandler,Boolean>());
+
+ private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean>
connectionBeans = new ConcurrentHashMap<>();
+
+ /**
+ * we want to keep a log of past txns so that observers can sync up
with us when we connect,
+ * but we can't keep everything in memory, so this limits how much
memory will be dedicated
+ * to keeping recent txns.
+ */
+ private final static int PKTS_SIZE_LIMIT = 32 * 1024 * 1024;
+ private static volatile int pktsSizeLimit =
Integer.getInteger("zookeeper.observerMaster.sizeLimit", PKTS_SIZE_LIMIT);
+ private ConcurrentLinkedQueue<QuorumPacket> proposedPkts = new
ConcurrentLinkedQueue<>();
+ private ConcurrentLinkedQueue<QuorumPacket> committedPkts = new
ConcurrentLinkedQueue<>();
+ private int pktsSize = 0;
+
+ private long lastProposedZxid;
+
+ // ensure ordering of revalidations returned to this learner
+ private final Object revalidateSessionLock = new Object();
+
+ // Throttle when there are too many concurrent snapshots being sent to
observers
+ private static final String MAX_CONCURRENT_SNAPSHOTS =
"zookeeper.leader.maxConcurrentSnapshots";
+ private static final int maxConcurrentSnapshots;
+
+ private static final String MAX_CONCURRENT_DIFFS =
"zookeeper.leader.maxConcurrentDiffs";
+ private static final int maxConcurrentDiffs;
+ static {
+ maxConcurrentSnapshots =
Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 10);
+ LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " +
maxConcurrentSnapshots);
+
+ maxConcurrentDiffs = Integer.getInteger(MAX_CONCURRENT_DIFFS, 100);
+ LOG.info(MAX_CONCURRENT_DIFFS + " = " + maxConcurrentDiffs);
+ }
+
+ private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations
= new ConcurrentLinkedQueue<>();
+ static class Revalidation {
+ public final long sessionId;
+ public final int timeout;
+ public final LearnerHandler handler;
+
+ Revalidation(final Long sessionId, final int timeout, final
LearnerHandler handler) {
+ this.sessionId = sessionId;
+ this.timeout = timeout;
+ this.handler = handler;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final Revalidation that = (Revalidation) o;
+ return sessionId == that.sessionId && timeout == that.timeout
&& handler.equals(that.handler);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (sessionId ^ (sessionId >>> 32));
+ result = 31 * result + timeout;
+ result = 31 * result + handler.hashCode();
+ return result;
+ }
+ }
+
+ private final LearnerSnapshotThrottler learnerSnapshotThrottler =
+ new LearnerSnapshotThrottler(maxConcurrentSnapshots);
+
+ private Thread thread;
+ private ServerSocket ss;
+ private boolean listenerRunning;
+ private ScheduledExecutorService pinger;
+
+ Runnable ping = new Runnable() {
+ @Override
+ public void run() {
+ for (LearnerHandler lh: activeObservers) {
+ lh.ping();
+ }
+ }
+ };
+
+ ObserverMaster(QuorumPeer self, FollowerZooKeeperServer zks, int port)
{
+ this.self = self;
+ this.zks = zks;
+ this.port = port;
+ }
+
+ @Override
+ public void addLearnerHandler(LearnerHandler learnerHandler) {
+ if (!listenerRunning) {
+ throw new RuntimeException(("ObserverMaster is not running"));
+ }
+ }
+
+ @Override
+ public void removeLearnerHandler(LearnerHandler learnerHandler) {
+ activeObservers.remove(learnerHandler);
+ }
+
+ @Override
+ public int syncTimeout() {
+ return self.getSyncLimit() * self.getTickTime();
+ }
+
+ @Override
+ public int getTickOfNextAckDeadline() {
+ return self.tick.get() + self.syncLimit;
+ }
+
+ @Override
+ public int getTickOfInitialAckDeadline() {
+ return self.tick.get() + self.initLimit + self.syncLimit;
+ }
+
+ @Override
+ public long getAndDecrementFollowerCounter() {
+ return followerCounter.getAndDecrement();
+ }
+
+ @Override
+ public void waitForEpochAck(long sid, StateSummary ss) throws
IOException, InterruptedException {
+ // since this is done by an active follower, we don't need to wait
for anything
+ }
+
+ @Override
+ public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
+ return learnerSnapshotThrottler;
+ }
+
+ @Override
+ public void waitForStartup() throws InterruptedException {
+ // since this is done by an active follower, we don't need to wait
for anything
+ }
+
+ @Override
+ synchronized public long getLastProposed() {
+ return lastProposedZxid;
+ }
+
+ @Override
+ public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws
InterruptedException, IOException {
+ return self.getCurrentEpoch();
+ }
+
+ @Override
+ public ZKDatabase getZKDatabase() {
+ return zks.getZKDatabase();
+ }
+
+ @Override
+ public void waitForNewLeaderAck(long sid, long zxid) throws
InterruptedException {
+ // no need to wait since we are a follower
+ }
+
+ @Override
+ public int getCurrentTick() {
+ return self.tick.get();
+ }
+
+ @Override
+ public void processAck(long sid, long zxid, SocketAddress
localSocketAddress) {
+ if ((zxid & 0xffffffffL) == 0) {
+ /*
+ * We no longer process NEWLEADER ack by this method. However,
+ * the learner sends ack back to the leader after it gets
UPTODATE
+ * so we just ignore the message.
+ */
+ return;
+ }
+
+ throw new RuntimeException("Observers shouldn't send ACKS ack = "
+ Long.toHexString(zxid));
+ }
+
+ @Override
+ public void touch(long sess, int to) {
+ zks.getSessionTracker().touchSession(sess, to);
+ }
+
+ boolean revalidateLearnerSession(QuorumPacket qp) throws IOException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
+ DataInputStream dis = new DataInputStream(bis);
+ long id = dis.readLong();
+ boolean valid = dis.readBoolean();
+ Iterator<Revalidation> itr = pendingRevalidations.iterator();
+ if (!itr.hasNext()) {
+ // not a learner session, handle locally
+ return false;
+ }
+ Revalidation revalidation = itr.next();
+ if (revalidation.sessionId != id) {
+ // not a learner session, handle locally
+ return false;
+ }
+ itr.remove();
+ LearnerHandler learnerHandler = revalidation.handler;
+ // create a copy here as the qp object is reused by the Follower
and may be mutated
+ QuorumPacket deepCopy = new QuorumPacket(qp.getType(),
qp.getZxid(),
+ Arrays.copyOf(qp.getData(), qp.getData().length),
+ qp.getAuthinfo() == null ? null : new
ArrayList<>(qp.getAuthinfo()));
+ learnerHandler.queuePacket(deepCopy);
+ // To keep consistent as leader, touch the session when it's
+ // revalidating the session, only update if it's a valid session.
+ if (valid) {
+ touch(revalidation.sessionId, revalidation.timeout);
+ }
+ return true;
+ }
+
+ @Override
+ public void revalidateSession(QuorumPacket qp, LearnerHandler
learnerHandler) throws IOException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
+ DataInputStream dis = new DataInputStream(bis);
+ long id = dis.readLong();
+ int to = dis.readInt();
+ synchronized (revalidateSessionLock) {
+ pendingRevalidations.add(new Revalidation(id, to,
learnerHandler));
+ Learner learner = zks.getLearner();
+ if (learner != null) {
+ learner.writePacket(qp, true);
+ }
+ }
+ }
+
+ @Override
+ public void submitLearnerRequest(Request si) {
+ zks.processObserverRequest(si);
+ }
+
+ @Override
+ synchronized public long startForwarding(LearnerHandler
learnerHandler, long lastSeenZxid) {
+ Iterator<QuorumPacket> itr = committedPkts.iterator();
+ if (itr.hasNext()) {
+ QuorumPacket packet = itr.next();
+ if (packet.getZxid() > lastSeenZxid + 1) {
+ LOG.error("LearnerHandler is too far behind ({} < {}),
disconnecting {} at {}", Long.toHexString(lastSeenZxid + 1),
+ Long.toHexString(packet.getZxid()),
learnerHandler.getSid(), learnerHandler.getRemoteAddress());
+ learnerHandler.shutdown();
+ return -1;
+ } else if (packet.getZxid() == lastSeenZxid + 1) {
+ learnerHandler.queuePacket(packet);
+ }
+ long queueHeadZxid = packet.getZxid();
+ long queueBytesUsed = LearnerHandler.packetSize(packet);
+ while (itr.hasNext()) {
+ packet = itr.next();
+ if (packet.getZxid() <= lastSeenZxid) {
+ continue;
+ }
+ learnerHandler.queuePacket(packet);
+ queueBytesUsed += LearnerHandler.packetSize(packet);
+ }
+ LOG.info("finished syncing observer from retained commit
queue: sid {}, " +
+ "queue head 0x{}, queue tail 0x{}, sync
position 0x{}, num packets used {}, " +
+ "num bytes used {}",
+ learnerHandler.getSid(),
+ Long.toHexString(queueHeadZxid),
+ Long.toHexString(packet.getZxid()),
+ Long.toHexString(lastSeenZxid),
+ packet.getZxid() - lastSeenZxid,
+ queueBytesUsed);
+ }
+ activeObservers.add(learnerHandler);
+ return lastProposedZxid;
+ }
+
+ @Override
+ public long getQuorumVerifierVersion() {
+ return self.getQuorumVerifier().getVersion();
+ }
+
+ @Override
+ public String getPeerInfo(long sid) {
+ QuorumPeer.QuorumServer server = self.getView().get(sid);
+ return server == null ? "" : server.toString();
+ }
+
+ @Override
+ public byte[] getQuorumVerifierBytes() {
+ return self.getLastSeenQuorumVerifier().toString().getBytes();
+ }
+
+ @Override
+ public QuorumAuthServer getQuorumAuthServer() {
+ return (self == null) ? null : self.authServer;
+ }
+
+ void proposalReceived(QuorumPacket qp) {
+ proposedPkts.add(new QuorumPacket(Leader.INFORM, qp.getZxid(),
qp.getData(), null));
+ }
+
+ private synchronized QuorumPacket removeProposedPacket(long zxid) {
+ QuorumPacket pkt = proposedPkts.peek();
+ if (pkt == null || pkt.getZxid() > zxid) {
+ LOG.debug("ignore missing proposal packet for {}",
Long.toHexString(zxid));
+ return null;
+ }
+ if (pkt.getZxid() != zxid) {
+ final String m = String.format("Unexpected proposal packet on
commit ack, expected zxid 0x%d got zxid 0x%d",
+ zxid, pkt.getZxid());
+ LOG.error(m);
+ throw new RuntimeException(m);
+ }
+ proposedPkts.remove();
+ return pkt;
+ }
+
+ private synchronized void cacheCommittedPacket(final QuorumPacket pkt)
{
+ committedPkts.add(pkt);
+ pktsSize += LearnerHandler.packetSize(pkt);
+ // remove 5 packets for every one added as we near the size limit
+ for (int i = 0; pktsSize > pktsSizeLimit * 0.8 && i < 5; i++) {
+ QuorumPacket oldPkt = committedPkts.poll();
+ if (oldPkt == null) {
+ pktsSize = 0;
+ break;
+ }
+ pktsSize -= LearnerHandler.packetSize(oldPkt);
+ }
+ // enforce the size limit as a hard cap
+ while (pktsSize > pktsSizeLimit) {
+ QuorumPacket oldPkt = committedPkts.poll();
+ if (oldPkt == null) {
+ pktsSize = 0;
+ break;
+ }
+ pktsSize -= LearnerHandler.packetSize(oldPkt);
+ }
+ }
+
+ private synchronized void sendPacket(final QuorumPacket pkt) {
+ for (LearnerHandler lh: activeObservers) {
+ lh.queuePacket(pkt);
+ }
+ lastProposedZxid = pkt.getZxid();
+ }
+
+ synchronized void proposalCommitted(long zxid) {
+ QuorumPacket pkt = removeProposedPacket(zxid);
+ if (pkt == null) {
+ return;
+ }
+ cacheCommittedPacket(pkt);
+ sendPacket(pkt);
+ }
+
+ synchronized void informAndActivate(long zxid, long suggestedLeaderId)
{
+ QuorumPacket pkt = removeProposedPacket(zxid);
+ if (pkt == null) {
+ return;
+ }
+
+ // Build the INFORMANDACTIVATE packet
+ QuorumPacket informAndActivateQP =
Leader.buildInformAndActivePacket(
+ zxid, suggestedLeaderId, pkt.getData());
+ cacheCommittedPacket(informAndActivateQP);
+ sendPacket(informAndActivateQP);
+ }
+
+ synchronized public void start() throws IOException {
+ if (thread != null && thread.isAlive()) {
+ return;
+ }
+ listenerRunning = true;
+ if (self.getQuorumListenOnAllIPs()) {
+ ss = new ServerSocket(port, 10 /* dog science */);
+ } else {
+ ss = new ServerSocket(port, 10 /* dog science */,
self.getQuorumAddress().getAddress());
+ }
+ thread = new Thread(this, "ObserverMaster");
+ thread.start();
+ pinger = Executors.newSingleThreadScheduledExecutor();
+ pinger.scheduleAtFixedRate(ping, self.tickTime /2,
self.tickTime/2, TimeUnit.MILLISECONDS);
+ }
+
+ public void run() {
+ while (listenerRunning) {
+ try {
+ Socket s = ss.accept();
--- End diff --
One more thing, we added a recent change here to setSoTimeout to initLimit
* tickTime to align with what leader is doing.
We should add it here as well.
---