Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/628#discussion_r219581743
  
    --- Diff: 
src/java/main/org/apache/zookeeper/server/quorum/ObserverMaster.java ---
    @@ -0,0 +1,513 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.quorum;
    +
    +import org.apache.zookeeper.jmx.MBeanRegistry;
    +import org.apache.zookeeper.server.Request;
    +import org.apache.zookeeper.server.ZKDatabase;
    +
    +import java.io.BufferedInputStream;
    +import java.io.ByteArrayInputStream;
    +import java.io.DataInputStream;
    +import java.io.IOException;
    +import java.net.ServerSocket;
    +import java.net.Socket;
    +import java.net.SocketAddress;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Used by Followers to host Observers. This reduces the network load on 
the Leader process by pushing
    + * the responsibility for keeping Observers in sync off the leading peer.
    + *
    + * It is expected that Observers will continue to perform the initial 
vetting of clients and requests.
    + * Observers send the request to the follower where it is received by an 
ObserverMaster.
    + *
    + * The ObserverMaster forwards a copy of the request to the ensemble 
Leader and inserts it into its own
    + * request processor pipeline where it can be matched with the response 
comes back. All commits received
    + * from the Leader will be forwarded along to every Learner connected to 
the ObserverMaster.
    + *
    + * New Learners connecting to a Follower will receive a LearnerHandler 
object and be party to its syncing logic
    + * to be brought up to date.
    + *
    + * The logic is quite a bit simpler than the corresponding logic in Leader 
because it only hosts observers.
    + */
    +public class ObserverMaster implements LearnerMaster, Runnable {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(ObserverMaster.class);
    +
    +    //Follower counter
    +    private final AtomicLong followerCounter = new AtomicLong(-1);
    +
    +    private QuorumPeer self;
    +    private FollowerZooKeeperServer zks;
    +    private int port;
    +
    +    private Set<LearnerHandler> activeObservers = 
Collections.newSetFromMap(new ConcurrentHashMap<LearnerHandler,Boolean>());
    +
    +    private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> 
connectionBeans = new ConcurrentHashMap<>();
    +
    +    /**
    +     * we want to keep a log of past txns so that observers can sync up 
with us when we connect,
    +     * but we can't keep everything in memory, so this limits how much 
memory will be dedicated
    +     * to keeping recent txns.
    +     */
    +    private final static int PKTS_SIZE_LIMIT = 32 * 1024 * 1024;
    +    private static volatile int pktsSizeLimit = 
Integer.getInteger("zookeeper.observerMaster.sizeLimit", PKTS_SIZE_LIMIT);
    +    private ConcurrentLinkedQueue<QuorumPacket> proposedPkts = new 
ConcurrentLinkedQueue<>();
    +    private ConcurrentLinkedQueue<QuorumPacket> committedPkts = new 
ConcurrentLinkedQueue<>();
    +    private int pktsSize = 0;
    +
    +    private long lastProposedZxid;
    +
    +    // ensure ordering of revalidations returned to this learner
    +    private final Object revalidateSessionLock = new Object();
    +
    +    // Throttle when there are too many concurrent snapshots being sent to 
observers
    +    private static final String MAX_CONCURRENT_SNAPSHOTS = 
"zookeeper.leader.maxConcurrentSnapshots";
    +    private static final int maxConcurrentSnapshots;
    +
    +    private static final String MAX_CONCURRENT_DIFFS = 
"zookeeper.leader.maxConcurrentDiffs";
    +    private static final int maxConcurrentDiffs;
    +    static {
    +        maxConcurrentSnapshots = 
Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 10);
    +        LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + 
maxConcurrentSnapshots);
    +
    +        maxConcurrentDiffs = Integer.getInteger(MAX_CONCURRENT_DIFFS, 100);
    +        LOG.info(MAX_CONCURRENT_DIFFS + " = " + maxConcurrentDiffs);
    +    }
    +
    +    private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations 
= new ConcurrentLinkedQueue<>();
    +    static class Revalidation {
    +        public final long sessionId;
    +        public final int timeout;
    +        public final LearnerHandler handler;
    +
    +        Revalidation(final Long sessionId, final int timeout, final 
LearnerHandler handler) {
    +            this.sessionId = sessionId;
    +            this.timeout = timeout;
    +            this.handler = handler;
    +        }
    +
    +        @Override
    +        public boolean equals(Object o) {
    +            if (this == o) return true;
    +            if (o == null || getClass() != o.getClass()) return false;
    +
    +            final Revalidation that = (Revalidation) o;
    +            return sessionId == that.sessionId && timeout == that.timeout 
&& handler.equals(that.handler);
    +        }
    +
    +        @Override
    +        public int hashCode() {
    +            int result = (int) (sessionId ^ (sessionId >>> 32));
    +            result = 31 * result + timeout;
    +            result = 31 * result + handler.hashCode();
    +            return result;
    +        }
    +    }
    +
    +    private final LearnerSnapshotThrottler learnerSnapshotThrottler =
    +            new LearnerSnapshotThrottler(maxConcurrentSnapshots);
    +
    +    private Thread thread;
    +    private ServerSocket ss;
    +    private boolean listenerRunning;
    +    private ScheduledExecutorService pinger;
    +
    +    Runnable ping = new Runnable() {
    +        @Override
    +        public void run() {
    +            for (LearnerHandler lh: activeObservers) {
    +                lh.ping();
    +            }
    +        }
    +    };
    +
    +    ObserverMaster(QuorumPeer self, FollowerZooKeeperServer zks, int port) 
{
    +        this.self = self;
    +        this.zks = zks;
    +        this.port = port;
    +    }
    +
    +    @Override
    +    public void addLearnerHandler(LearnerHandler learnerHandler) {
    +        if (!listenerRunning) {
    +            throw new RuntimeException(("ObserverMaster is not running"));
    +        }
    +    }
    +
    +    @Override
    +    public void removeLearnerHandler(LearnerHandler learnerHandler) {
    +        activeObservers.remove(learnerHandler);
    +    }
    +
    +    @Override
    +    public int syncTimeout() {
    +        return self.getSyncLimit() * self.getTickTime();
    +    }
    +
    +    @Override
    +    public int getTickOfNextAckDeadline() {
    +        return self.tick.get() + self.syncLimit;
    +    }
    +
    +    @Override
    +    public int getTickOfInitialAckDeadline() {
    +        return self.tick.get() + self.initLimit + self.syncLimit;
    +    }
    +
    +    @Override
    +    public long getAndDecrementFollowerCounter() {
    +        return followerCounter.getAndDecrement();
    +    }
    +
    +    @Override
    +    public void waitForEpochAck(long sid, StateSummary ss) throws 
IOException, InterruptedException {
    +        // since this is done by an active follower, we don't need to wait 
for anything
    +    }
    +
    +    @Override
    +    public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
    +        return learnerSnapshotThrottler;
    +    }
    +
    +    @Override
    +    public void waitForStartup() throws InterruptedException {
    +        // since this is done by an active follower, we don't need to wait 
for anything
    +    }
    +
    +    @Override
    +    synchronized public long getLastProposed() {
    +        return lastProposedZxid;
    +    }
    +
    +    @Override
    +    public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws 
InterruptedException, IOException {
    +        return self.getCurrentEpoch();
    +    }
    +
    +    @Override
    +    public ZKDatabase getZKDatabase() {
    +        return zks.getZKDatabase();
    +    }
    +
    +    @Override
    +    public void waitForNewLeaderAck(long sid, long zxid) throws 
InterruptedException {
    +        // no need to wait since we are a follower
    +    }
    +
    +    @Override
    +    public int getCurrentTick() {
    +        return self.tick.get();
    +    }
    +
    +    @Override
    +    public void processAck(long sid, long zxid, SocketAddress 
localSocketAddress) {
    +        if ((zxid & 0xffffffffL) == 0) {
    +            /*
    +             * We no longer process NEWLEADER ack by this method. However,
    +             * the learner sends ack back to the leader after it gets 
UPTODATE
    +             * so we just ignore the message.
    +             */
    +            return;
    +        }
    +
    +        throw new RuntimeException("Observers shouldn't send ACKS ack = " 
+ Long.toHexString(zxid));
    +    }
    +
    +    @Override
    +    public void touch(long sess, int to) {
    +        zks.getSessionTracker().touchSession(sess, to);
    +    }
    +
    +    boolean revalidateLearnerSession(QuorumPacket qp) throws IOException {
    +        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
    +        DataInputStream dis = new DataInputStream(bis);
    +        long id = dis.readLong();
    +        boolean valid = dis.readBoolean();
    +        Iterator<Revalidation> itr = pendingRevalidations.iterator();
    +        if (!itr.hasNext()) {
    +            // not a learner session, handle locally
    +            return false;
    +        }
    +        Revalidation revalidation = itr.next();
    +        if (revalidation.sessionId != id) {
    +            // not a learner session, handle locally
    +            return false;
    +        }
    +        itr.remove();
    +        LearnerHandler learnerHandler = revalidation.handler;
    +        QuorumPacket deepCopy = new QuorumPacket(qp.getType(), 
qp.getZxid(),
    --- End diff --
    
    Brian, can you add a comment here to mention that we need a deepCopy of the 
QuorumPacket because qp object is reused to reduce the GC impact in learner 
when receiving packet from leader, and if we use that packet when it's being 
sent in the LearnerHandler asynchronously this packet might already be changed. 


---

Reply via email to