This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new b2f5548  ZOOKEEPER-3418: Improve quorum throughput through eager ACL 
checks of requests on local servers.
b2f5548 is described below

commit b2f5548bd5757edbf1887838a487a90523ed2b52
Author: Michael Han <l...@twitter.com>
AuthorDate: Thu Aug 1 07:31:29 2019 +0200

    ZOOKEEPER-3418: Improve quorum throughput through eager ACL checks of 
requests on local servers.
    
    Serving write requests that change the state of the system requires quorum 
operations, and in some cases, the quorum operations can be avoided if the 
requests are doomed to fail. ACL check failure is such a case. To optimize for 
this case, we elevate the ACL check logic and perform eager ACL check on local 
server (where the requests are received), and fail fast, before sending the 
requests to leader.
    
    As with any features, there is a feature flag that can control this feature 
on, or off (default). This feature is also forward compatible in that for new 
any new Op code (and some existing Op code we did not explicit check against), 
they will pass the check and (potentially) fail on leader side, instead of 
being prematurely filtered out on local server.
    
    The end result is better throughput and stability of the quorum for certain 
workloads.
    
    Author: Michael Han <l...@twitter.com>
    
    Reviewers: Andor Molnar <an...@apache.org>, Enrico Olivelli 
<eolive...@apache.org>
    
    Closes #971 from hanm/twitter/5c6bbfac811d29228fd97a4621fd6ba547fed178
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |   5 +
 .../java/org/apache/zookeeper/server/DataTree.java |   4 +-
 .../zookeeper/server/FinalRequestProcessor.java    |  12 +-
 .../zookeeper/server/PrepRequestProcessor.java     |  77 +-----
 .../apache/zookeeper/server/ZooKeeperServer.java   | 230 +++++++++++++++++
 .../server/quorum/FollowerRequestProcessor.java    |   6 +
 .../server/quorum/LeaderRequestProcessor.java      |   5 +
 .../server/quorum/ObserverRequestProcessor.java    |   6 +
 .../server/PrepRequestProcessorMetricsTest.java    |   8 +-
 .../server/quorum/EagerACLFilterTest.java          | 280 +++++++++++++++++++++
 .../java/org/apache/zookeeper/test/QuorumBase.java |   9 +
 11 files changed, 566 insertions(+), 76 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md 
b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index eea7e52..7a4773f 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -893,6 +893,11 @@ property, when available, is noted below.
     pipeline to avoid direct buffer OOM. It will disable the AUTO_READ in
     Netty.
 
+* *enableEagerACLCheck* :
+    (Java system property only: **zookeeper.enableEagerACLCheck**)
+    When set to "true", enables eager ACL check on write requests on each local
+    server before sending the requests to quorum. Default is "false".
+
 * *maxConcurrentSnapSyncs* :
     (Java system property: **zookeeper.leader.maxConcurrentSnapSyncs**)
     The maximum number of snap syncs a leader or a follower can serve at the 
same
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index 0ffad0f..1c7f4dc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -801,7 +801,9 @@ public class DataTree {
             throw new KeeperException.NoNodeException();
         }
         synchronized (n) {
-            n.copyStat(stat);
+            if (stat != null) {
+                n.copyStat(stat);
+            }
             return new ArrayList<ACL>(aclCache.convertLong(n.acl));
         }
     }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index d210d70..962a267 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -419,7 +419,7 @@ public class FinalRequestProcessor implements 
RequestProcessor {
                 if (n == null) {
                     throw new KeeperException.NoNodeException();
                 }
-                PrepRequestProcessor.checkACL(zks, request.cnxn, 
zks.getZKDatabase().aclForNode(n),
+                zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n),
                         ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN,
                         request.authInfo, path, null);
 
@@ -429,7 +429,7 @@ public class FinalRequestProcessor implements 
RequestProcessor {
                 requestPathMetricsCollector.registerRequest(request.type, 
getACLRequest.getPath());
 
                 try {
-                    PrepRequestProcessor.checkACL(zks, request.cnxn, 
zks.getZKDatabase().aclForNode(n),
+                    zks.checkACL(request.cnxn, 
zks.getZKDatabase().aclForNode(n),
                             ZooDefs.Perms.ADMIN,
                             request.authInfo, path, null);
                     rsp = new GetACLResponse(acl, stat);
@@ -469,7 +469,7 @@ public class FinalRequestProcessor implements 
RequestProcessor {
                 if (n == null) {
                     throw new KeeperException.NoNodeException();
                 }
-                PrepRequestProcessor.checkACL(zks, request.cnxn, 
zks.getZKDatabase().aclForNode(n),
+                zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n),
                         ZooDefs.Perms.READ,
                         request.authInfo, path, null);
                 int number = zks.getZKDatabase().getAllChildrenNumber(path);
@@ -487,7 +487,7 @@ public class FinalRequestProcessor implements 
RequestProcessor {
                 if (n == null) {
                     throw new KeeperException.NoNodeException();
                 }
-                PrepRequestProcessor.checkACL(zks, request.cnxn, 
zks.getZKDatabase().aclForNode(n),
+                zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n),
                         ZooDefs.Perms.READ,
                         request.authInfo, path, null);
                 List<String> children = zks.getZKDatabase().getChildren(
@@ -613,7 +613,7 @@ public class FinalRequestProcessor implements 
RequestProcessor {
         if (n == null) {
             throw new KeeperException.NoNodeException();
         }
-        PrepRequestProcessor.checkACL(zks, cnxn, 
zks.getZKDatabase().aclForNode(n),
+        zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n),
                 ZooDefs.Perms.READ, authInfo, path, null);
         List<String> children = zks.getZKDatabase().getChildren(path, null,
                 getChildrenRequest.getWatch() ? cnxn : null);
@@ -628,7 +628,7 @@ public class FinalRequestProcessor implements 
RequestProcessor {
         if (n == null) {
             throw new KeeperException.NoNodeException();
         }
-        PrepRequestProcessor.checkACL(zks, cnxn, 
zks.getZKDatabase().aclForNode(n),
+        zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n),
                 ZooDefs.Perms.READ, authInfo, path, null);
         Stat stat = new Stat();
         byte b[] = zks.getZKDatabase().getData(path, stat,
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index 6686736..11d3aad 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -92,14 +92,6 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
         RequestProcessor {
     private static final Logger LOG = 
LoggerFactory.getLogger(PrepRequestProcessor.class);
 
-    static boolean skipACL;
-    static {
-        skipACL = System.getProperty("zookeeper.skipACL", "no").equals("yes");
-        if (skipACL) {
-            LOG.info("zookeeper.skipACL==\"yes\", ACL checks will be skipped");
-        }
-    }
-
     /**
      * this is only for testing purposes.
      * should never be used otherwise
@@ -287,57 +279,6 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
     }
 
     /**
-     * Grant or deny authorization to an operation on a node as a function of:
-     * @param zks :     the ZooKeeper server
-     * @param cnxn :    the server connection
-     * @param acl :     set of ACLs for the node
-     * @param perm :    the permission that the client is requesting
-     * @param ids :     the credentials supplied by the client
-     * @param path :    the ZNode path
-     * @param setAcls : for set ACL operations, the list of ACLs being set. 
Otherwise null.
-     */
-    static void checkACL(ZooKeeperServer zks, ServerCnxn cnxn, List<ACL> acl, 
int perm, List<Id> ids,
-                         String path, List<ACL> setAcls) throws 
KeeperException.NoAuthException {
-        if (skipACL) {
-            return;
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Permission requested: {} ", perm);
-            LOG.debug("ACLs for node: {}", acl);
-            LOG.debug("Client credentials: {}", ids);
-        }
-        if (acl == null || acl.size() == 0) {
-            return;
-        }
-        for (Id authId : ids) {
-            if (authId.getScheme().equals("super")) {
-                return;
-            }
-        }
-        for (ACL a : acl) {
-            Id id = a.getId();
-            if ((a.getPerms() & perm) != 0) {
-                if (id.getScheme().equals("world")
-                        && id.getId().equals("anyone")) {
-                    return;
-                }
-                ServerAuthenticationProvider ap = 
ProviderRegistry.getServerProvider(id
-                        .getScheme());
-                if (ap != null) {
-                    for (Id authId : ids) {
-                        if (authId.getScheme().equals(id.getScheme())
-                                && ap.matches(new 
ServerAuthenticationProvider.ServerObjs(zks, cnxn),
-                                new 
ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), 
perm, setAcls))) {
-                            return;
-                        }
-                    }
-                }
-            }
-        }
-        throw new KeeperException.NoAuthException();
-    }
-
-    /**
      * Performs basic validation of a path for a create request.
      * Throws if the path is not valid and returns the parent path.
      * @throws BadArgumentsException
@@ -403,7 +344,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                 String path = deleteRequest.getPath();
                 String parentPath = getParentPathAndValidate(path);
                 ChangeRecord parentRecord = getRecordForPath(parentPath);
-                checkACL(zks, request.cnxn, parentRecord.acl, 
ZooDefs.Perms.DELETE, request.authInfo, path, null);
+                zks.checkACL(request.cnxn, parentRecord.acl, 
ZooDefs.Perms.DELETE, request.authInfo, path, null);
                 ChangeRecord nodeRecord = getRecordForPath(path);
                 checkAndIncVersion(nodeRecord.stat.getVersion(), 
deleteRequest.getVersion(), path);
                 if (nodeRecord.childCount > 0) {
@@ -423,7 +364,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                 path = setDataRequest.getPath();
                 validatePath(path, request.sessionId);
                 nodeRecord = getRecordForPath(path);
-                checkACL(zks, request.cnxn, nodeRecord.acl, 
ZooDefs.Perms.WRITE, request.authInfo, path, null);
+                zks.checkACL(request.cnxn, nodeRecord.acl, 
ZooDefs.Perms.WRITE, request.authInfo, path, null);
                 int newVersion = 
checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), 
path);
                 request.setTxn(new SetDataTxn(path, setDataRequest.getData(), 
newVersion));
                 nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
@@ -436,7 +377,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                     throw new KeeperException.ReconfigDisabledException();
                 }
 
-                if (skipACL) {
+                if (ZooKeeperServer.skipACL) {
                     LOG.warn("skipACL is set, reconfig operation will skip ACL 
checks!");
                 }
 
@@ -557,7 +498,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                 }
 
                 nodeRecord = getRecordForPath(ZooDefs.CONFIG_NODE);
-                checkACL(zks, request.cnxn, nodeRecord.acl, 
ZooDefs.Perms.WRITE, request.authInfo, null, null);
+                zks.checkACL(request.cnxn, nodeRecord.acl, 
ZooDefs.Perms.WRITE, request.authInfo, null, null);
                 request.setTxn(new SetDataTxn(ZooDefs.CONFIG_NODE, 
request.qv.toString().getBytes(), -1));
                 nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
                 nodeRecord.stat.setVersion(-1);
@@ -572,7 +513,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                 validatePath(path, request.sessionId);
                 List<ACL> listACL = fixupACL(path, request.authInfo, 
setAclRequest.getAcl());
                 nodeRecord = getRecordForPath(path);
-                checkACL(zks, request.cnxn, nodeRecord.acl, 
ZooDefs.Perms.ADMIN, request.authInfo, path, listACL);
+                zks.checkACL(request.cnxn, nodeRecord.acl, 
ZooDefs.Perms.ADMIN, request.authInfo, path, listACL);
                 newVersion = checkAndIncVersion(nodeRecord.stat.getAversion(), 
setAclRequest.getVersion(), path);
                 request.setTxn(new SetACLTxn(path, listACL, newVersion));
                 nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
@@ -621,7 +562,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                 path = checkVersionRequest.getPath();
                 validatePath(path, request.sessionId);
                 nodeRecord = getRecordForPath(path);
-                checkACL(zks, request.cnxn, nodeRecord.acl, 
ZooDefs.Perms.READ, request.authInfo, path, null);
+                zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.READ, 
request.authInfo, path, null);
                 request.setTxn(new CheckVersionTxn(path, 
checkAndIncVersion(nodeRecord.stat.getVersion(),
                         checkVersionRequest.getVersion(), path)));
                 break;
@@ -663,7 +604,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
         List<ACL> listACL = fixupACL(path, request.authInfo, acl);
         ChangeRecord parentRecord = getRecordForPath(parentPath);
 
-        checkACL(zks, request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, 
request.authInfo, path, listACL);
+        zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, 
request.authInfo, path, listACL);
         int parentCVersion = parentRecord.stat.getCversion();
         if (createMode.isSequential()) {
             path = path + String.format(Locale.ENGLISH, "%010d", 
parentCVersion);
@@ -914,7 +855,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
         nextProcessor.processRequest(request);
     }
 
-    private List<ACL> removeDuplicates(final List<ACL> acls) {
+    private static List<ACL> removeDuplicates(final List<ACL> acls) {
         if (acls == null || acls.isEmpty()) {
           return Collections.emptyList();
         }
@@ -964,7 +905,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
      * @return verified and expanded ACLs
      * @throws KeeperException.InvalidACLException
      */
-    private List<ACL> fixupACL(String path, List<Id> authInfo, List<ACL> acls)
+    public static List<ACL> fixupACL(String path, List<Id> authInfo, List<ACL> 
acls)
         throws KeeperException.InvalidACLException {
         // check for well formed ACLs
         // This resolves https://issues.apache.org/jira/browse/ZOOKEEPER-1877
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index b74507b..e580ab5 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -49,6 +49,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.Version;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZookeeperBanner;
 import org.apache.zookeeper.common.Time;
@@ -60,9 +61,13 @@ import org.apache.zookeeper.metrics.MetricsContext;
 import org.apache.zookeeper.proto.AuthPacket;
 import org.apache.zookeeper.proto.ConnectRequest;
 import org.apache.zookeeper.proto.ConnectResponse;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.DeleteRequest;
 import org.apache.zookeeper.proto.GetSASLRequest;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.proto.SetACLRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
 import org.apache.zookeeper.proto.SetSASLResponse;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
@@ -90,6 +95,16 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
     protected static final Logger LOG;
 
     public static final String GLOBAL_OUTSTANDING_LIMIT = 
"zookeeper.globalOutstandingLimit";
+
+    public static final String ENABLE_EAGER_ACL_CHECK = 
"zookeeper.enableEagerACLCheck";
+    public static final String SKIP_ACL = "zookeeper.skipACL";
+
+    // When enabled, will check ACL constraints appertained to the requests 
first,
+    // before sending the requests to the quorum.
+    static final boolean enableEagerACLCheck;
+
+    static final boolean skipACL;
+
     public static final String ALLOW_SASL_FAILED_CLIENTS = 
"zookeeper.allowSaslFailedClients";
     public static final String SESSION_REQUIRE_CLIENT_SASL_AUTH = 
"zookeeper.sessionRequireClientSASLAuth";
     public static final String SASL_AUTH_SCHEME = "sasl";
@@ -100,6 +115,14 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
         ZookeeperBanner.printBanner(LOG);
 
         Environment.logEnv("Server environment:", LOG);
+
+        enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK);
+        LOG.info(ENABLE_EAGER_ACL_CHECK + " = {}", enableEagerACLCheck);
+
+        skipACL = System.getProperty(SKIP_ACL, "no").equals("yes");
+        if (skipACL) {
+            LOG.info(SKIP_ACL + "==\"yes\", ACL checks will be skipped");
+        }
     }
 
     protected ZooKeeperServerBean jmxServerBean;
@@ -1648,4 +1671,211 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
       response.accept("version", Version.getFullVersion());
       response.accept("server_state", stats.getServerState());
     }
+
+    /**
+     * Grant or deny authorization to an operation on a node as a function of:
+     * @param cnxn :    the server connection
+     * @param acl :     set of ACLs for the node
+     * @param perm :    the permission that the client is requesting
+     * @param ids :     the credentials supplied by the client
+     * @param path :    the ZNode path
+     * @param setAcls : for set ACL operations, the list of ACLs being set. 
Otherwise null.
+     */
+    public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> 
ids,
+                         String path, List<ACL> setAcls) throws 
KeeperException.NoAuthException {
+        if (skipACL) {
+            return;
+        }
+
+        LOG.debug("Permission requested: {} ", perm);
+        LOG.debug("ACLs for node: {}", acl);
+        LOG.debug("Client credentials: {}", ids);
+
+        if (acl == null || acl.size() == 0) {
+            return;
+        }
+        for (Id authId : ids) {
+            if (authId.getScheme().equals("super")) {
+                return;
+            }
+        }
+        for (ACL a : acl) {
+            Id id = a.getId();
+            if ((a.getPerms() & perm) != 0) {
+                if (id.getScheme().equals("world")
+                    && id.getId().equals("anyone")) {
+                    return;
+                }
+                ServerAuthenticationProvider ap = 
ProviderRegistry.getServerProvider(id
+                    .getScheme());
+                if (ap != null) {
+                    for (Id authId : ids) {
+                        if (authId.getScheme().equals(id.getScheme())
+                            && ap.matches(new 
ServerAuthenticationProvider.ServerObjs(this, cnxn),
+                            new ServerAuthenticationProvider.MatchValues(path, 
authId.getId(), id.getId(), perm, setAcls))) {
+                            return;
+                        }
+                    }
+                }
+            }
+        }
+        throw new KeeperException.NoAuthException();
+    }
+
+    /**
+     * Trim a path to get the immediate predecessor.
+     *
+     * @param path
+     * @return
+     * @throws KeeperException.BadArgumentsException
+     */
+    private String parentPath(String path)
+        throws KeeperException.BadArgumentsException {
+        int lastSlash = path.lastIndexOf('/');
+        if (lastSlash == -1 || path.indexOf('\0') != -1
+            || getZKDatabase().isSpecialPath(path)) {
+            throw new KeeperException.BadArgumentsException(path);
+        }
+        return lastSlash == 0 ? "/" : path.substring(0, lastSlash);
+    }
+
+    private String effectiveACLPath(Request request)
+        throws KeeperException.BadArgumentsException,
+        KeeperException.InvalidACLException {
+        boolean mustCheckACL = false;
+        String path = null;
+        List<ACL> acl = null;
+
+        switch (request.type) {
+            case OpCode.create:
+            case OpCode.create2: {
+                CreateRequest req = new CreateRequest();
+                if (buffer2Record(request.request, req)) {
+                    mustCheckACL = true;
+                    acl = req.getAcl();
+                    path = parentPath(req.getPath());
+                }
+                break;
+            }
+            case OpCode.delete: {
+                DeleteRequest req = new DeleteRequest();
+                if (buffer2Record(request.request, req)) {
+                    path = parentPath(req.getPath());
+                }
+                break;
+            }
+            case OpCode.setData: {
+                SetDataRequest req = new SetDataRequest();
+                if (buffer2Record(request.request, req)) {
+                    path = req.getPath();
+                }
+                break;
+            }
+            case OpCode.setACL: {
+                SetACLRequest req = new SetACLRequest();
+                if (buffer2Record(request.request, req)) {
+                    mustCheckACL = true;
+                    acl = req.getAcl();
+                    path = req.getPath();
+                }
+                break;
+            }
+        }
+
+        if (mustCheckACL) {
+            /* we ignore the extrapolated ACL returned by fixupACL because
+             * we only care about it being well-formed (and if it isn't, an
+             * exception will be raised).
+             */
+            PrepRequestProcessor.fixupACL(path, request.authInfo, acl);
+        }
+
+        return path;
+    }
+
+    private int effectiveACLPerms(Request request) {
+        switch (request.type) {
+            case OpCode.create: case OpCode.create2:
+                return ZooDefs.Perms.CREATE;
+            case OpCode.delete:
+                return ZooDefs.Perms.DELETE;
+            case OpCode.setData:
+                return ZooDefs.Perms.WRITE;
+            case OpCode.setACL:
+                return ZooDefs.Perms.ADMIN;
+            default:
+                return ZooDefs.Perms.ALL;
+        }
+    }
+
+    /**
+     * Check Write Requests for Potential Access Restrictions
+     * <p/>
+     * Before a request is being proposed to the quorum, lets check it
+     * against local ACLs. Non-write requests (read, session, etc.)
+     * are passed along. Invalid requests are sent a response.
+     * <p/>
+     * While we are at it, if the request will set an ACL: make sure it's
+     * a valid one.
+     *
+     * @param request
+     * @return true if request is permitted, false if not.
+     * @throws java.io.IOException
+     */
+    public boolean authWriteRequest(Request request) {
+        int err;
+        String pathToCheck;
+
+        if (!enableEagerACLCheck) {
+            return true;
+        }
+
+        err = KeeperException.Code.OK.intValue();
+
+        try {
+            pathToCheck = effectiveACLPath(request);
+            if (pathToCheck != null) {
+                checkACL(request.cnxn, zkDb.getACL(pathToCheck, null),
+                    effectiveACLPerms(request), request.authInfo, pathToCheck, 
null);
+            }
+        } catch (KeeperException.NoAuthException e) {
+            LOG.debug("Request failed ACL check", e);
+            err = e.code().intValue();
+        } catch (KeeperException.InvalidACLException e) {
+            LOG.debug("Request has an invalid ACL check", e);
+            err = e.code().intValue();
+        } catch (KeeperException.NoNodeException e) {
+            LOG.debug("ACL check against non-existent node: {}", 
e.getMessage());
+        } catch (KeeperException.BadArgumentsException e) {
+            LOG.debug("ACL check against illegal node path: {}", 
e.getMessage());
+        } catch (Throwable t) {
+            LOG.error("Uncaught exception in authWriteRequest with: ", t);
+            throw t;
+        } finally {
+            if (err != KeeperException.Code.OK.intValue()) {
+                /*  This request has a bad ACL, so we are dismissing it early. 
*/
+                decInProcess();
+                ReplyHeader rh = new ReplyHeader(request.cxid, 0, err);
+                try {
+                    request.cnxn.sendResponse(rh, null, null);
+                } catch (IOException e) {
+                    LOG.error("IOException : {}", e);
+                }
+            }
+        }
+
+        return err == KeeperException.Code.OK.intValue();
+    }
+
+    private boolean buffer2Record(ByteBuffer request, Record record) {
+        boolean rv = false;
+        try {
+            ByteBufferInputStream.byteBuffer2Record(request, record);
+            request.rewind();
+            rv = true;
+        } catch (IOException ex) {
+        }
+
+        return rv;
+    }
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
index 2f345a8..ca96405 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
@@ -67,6 +67,12 @@ public class FollowerRequestProcessor extends 
ZooKeeperCriticalThread implements
                 if (request == Request.requestOfDeath) {
                     break;
                 }
+
+                // Screen quorum requests against ACLs first
+                if (!zks.authWriteRequest(request)) {
+                    continue;
+                }
+
                 // We want to queue the request to be processed before we 
submit
                 // the request to the leader so that we are ready to receive
                 // the response
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java
index beb7c40..a013cdb 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java
@@ -49,6 +49,11 @@ public class LeaderRequestProcessor implements 
RequestProcessor {
     @Override
     public void processRequest(Request request)
             throws RequestProcessorException {
+        // Screen quorum requests against ACLs first
+        if (!lzks.authWriteRequest(request)) {
+            return;
+        }
+
         // Check if this is a local session and we are trying to create
         // an ephemeral node, in which case we upgrade the session
         Request upgradeRequest = null;
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
index 85a5212..7db833f 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
@@ -76,6 +76,12 @@ public class ObserverRequestProcessor extends 
ZooKeeperCriticalThread implements
                 if (request == Request.requestOfDeath) {
                     break;
                 }
+
+                // Screen quorum requests against ACLs first
+                if (!zks.authWriteRequest(request)) {
+                    continue;
+                }
+
                 // We want to queue the request to be processed before we 
submit
                 // the request to the leader so that we are ready to receive
                 // the response
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
index a277a1c..9a130f0 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
@@ -27,6 +27,7 @@ import org.apache.zookeeper.proto.DeleteRequest;
 import org.apache.zookeeper.proto.SetDataRequest;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -57,6 +58,7 @@ public class PrepRequestProcessorMetricsTest extends 
ZKTestCase {
 
     @Before
     public void setup() {
+        System.setProperty(ZooKeeperServer.SKIP_ACL, "true");
         zks = spy(new ZooKeeperServer());
         zks.sessionTracker = mock(SessionTracker.class);
 
@@ -75,6 +77,11 @@ public class PrepRequestProcessorMetricsTest extends 
ZKTestCase {
         ServerMetrics.getMetrics().resetAll();
     }
 
+    @After
+    public void tearDown() throws Exception {
+        System.clearProperty(ZooKeeperServer.SKIP_ACL);
+    }
+
     private Request createRequest(Record record, int opCode) throws 
IOException {
         // encoding
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -113,7 +120,6 @@ public class PrepRequestProcessorMetricsTest extends 
ZKTestCase {
             return  
null;}).when(nextProcessor).processRequest(any(Request.class));
 
         PrepRequestProcessor prepRequestProcessor = new 
PrepRequestProcessor(zks, nextProcessor);
-        PrepRequestProcessor.skipACL = true;
 
         //setData will generate one change
         prepRequestProcessor.processRequest(createRequest("/foo", 
ZooDefs.OpCode.setData));
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
new file mode 100644
index 0000000..0c869a2
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.QuorumBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class EagerACLFilterTest extends QuorumBase {
+    protected boolean checkEnabled;
+    protected ServerState serverState;
+    protected final CountDownLatch callComplete = new CountDownLatch(1);
+    protected boolean complete = false;
+    protected final static String PARENT_PATH = "/foo";
+    protected final static String CHILD_PATH = "/foo/bar";
+    protected final static String AUTH_PROVIDER = "digest";
+    protected final static byte[] AUTH = "hello".getBytes();
+    protected final static byte[] AUTHB = "goodbye".getBytes();
+    protected final static byte[] DATA = "Hint Water".getBytes();
+    protected TestableZooKeeper zkClient;
+    protected TestableZooKeeper zkClientB;
+    protected QuorumPeer zkLeader;
+    protected ZooKeeperServer connectedServer;
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(
+                new Object[][] {
+                        {ServerState.LEADING, true},
+                        {ServerState.LEADING, false},
+                        {ServerState.FOLLOWING, true},
+                        {ServerState.FOLLOWING, false},
+                        {ServerState.OBSERVING, true},
+                        {ServerState.OBSERVING, false}});
+    }
+
+    public EagerACLFilterTest(ServerState state, boolean checkEnabled) {
+        this.serverState = state;
+        this.checkEnabled = checkEnabled;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        ensureCheck(checkEnabled);
+        CountdownWatcher clientWatch = new CountdownWatcher();
+        CountdownWatcher clientWatchB = new CountdownWatcher();
+        super.setUp(true);
+
+        String hostPort = getPeersMatching(serverState).split(",")[0];
+        int clientPort = Integer.parseInt(hostPort.split(":")[1]);
+
+        zkLeader = getPeerList().get(getLeaderIndex());
+        connectedServer = getPeerByClientPort(clientPort).getActiveServer();
+
+        zkClient = createClient(clientWatch, hostPort);
+        zkClientB = createClient(clientWatchB, hostPort);
+        zkClient.addAuthInfo(AUTH_PROVIDER, AUTH);
+        zkClientB.addAuthInfo(AUTH_PROVIDER, AUTHB);
+        clientWatch.waitForConnected(CONNECTION_TIMEOUT);
+        clientWatchB.waitForConnected(CONNECTION_TIMEOUT);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (zkClient != null) {
+            zkClient.close();
+        }
+
+        if (zkClientB != null) {
+            zkClientB.close();
+        }
+
+        super.tearDown();
+    }
+
+    private void ensureCheck(boolean enabled) {
+        if (enabled) {
+            System.setProperty(ZooKeeperServer.ENABLE_EAGER_ACL_CHECK, "true");
+        } else {
+            System.clearProperty(ZooKeeperServer.ENABLE_EAGER_ACL_CHECK);
+        }
+    }
+
+    private void assertTransactionState(String condition, long lastxid) {
+        String assertion =
+                String.format(
+                        "Server State: %s Check Enabled: %s %s",
+                        serverState,
+                        checkEnabled,
+                        condition);
+        if (checkEnabled) {
+            Assert.assertEquals(assertion, lastxid, 
zkLeader.getLastLoggedZxid());
+        }
+        else {
+            Assert.assertNotSame(assertion, lastxid, 
zkLeader.getLastLoggedZxid());
+        }
+    }
+
+    @Test
+    public void testCreateOK() throws Exception {
+        ensureCheck(true);
+        zkClient.create(PARENT_PATH, DATA, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zkClientB.create(CHILD_PATH, DATA, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+
+        Assert.assertEquals("OutstandingRequests not decremented",
+                            0,
+                            connectedServer.getInProcess());
+    }
+
+    @Test
+    public void testCreate2OK() throws Exception {
+        zkClient.create(PARENT_PATH, DATA, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT, null);
+        zkClientB.create(CHILD_PATH, DATA, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT, null);
+
+        Assert.assertEquals("OutstandingRequests not decremented",
+                            0,
+                            connectedServer.getInProcess());
+    }
+
+    @Test
+    public void testCreateFail() throws Exception {
+        zkClient.create(PARENT_PATH, DATA, Ids.CREATOR_ALL_ACL, 
CreateMode.PERSISTENT);
+        long lastxid = zkLeader.getLastLoggedZxid();
+        try {
+            zkClientB.create(CHILD_PATH, DATA, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        }
+        catch (KeeperException.NoAuthException e) {}
+
+        Assert.assertEquals("OutstandingRequests not decremented",
+                            0,
+                            connectedServer.getInProcess());
+
+        assertTransactionState("Transaction state on Leader after failed 
create", lastxid);
+    }
+
+    @Test
+    public void testCreate2Fail() throws Exception {
+        zkClient.create(PARENT_PATH, DATA, Ids.CREATOR_ALL_ACL, 
CreateMode.PERSISTENT, null);
+        long lastxid = zkLeader.getLastLoggedZxid();
+        try {
+            zkClientB.create(CHILD_PATH, DATA, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT, null);
+        }
+        catch (KeeperException.NoAuthException e) {}
+
+        Assert.assertEquals("OutstandingRequests not decremented",
+                            0,
+                            connectedServer.getInProcess());
+
+        assertTransactionState("Transaction state on Leader after failed 
create2", lastxid);
+    }
+
+    @Test
+    public void testDeleteOK() throws Exception {
+        zkClient.create(PARENT_PATH, DATA, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zkClientB.delete(PARENT_PATH, -1);
+
+        Assert.assertEquals("OutstandingRequests not decremented",
+                            0,
+                            connectedServer.getInProcess());
+    }
+
+    @Test
+    public void testDeleteFail() throws Exception {
+        zkClient.create(PARENT_PATH, DATA, Ids.CREATOR_ALL_ACL, 
CreateMode.PERSISTENT, null);
+        zkClient.create(CHILD_PATH, DATA, Ids.CREATOR_ALL_ACL, 
CreateMode.PERSISTENT, null);
+        long lastxid = zkLeader.getLastLoggedZxid();
+        try {
+            zkClientB.delete(CHILD_PATH, -1);
+        }
+        catch (KeeperException.NoAuthException e) {}
+
+        Assert.assertEquals("OutstandingRequests not decremented",
+                            0,
+                            connectedServer.getInProcess());
+
+        assertTransactionState("Transaction state on Leader after failed 
delete", lastxid);
+    }
+
+    @Test
+    public void testSetDataOK() throws Exception {
+        zkClient.create(PARENT_PATH, null, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT, null);
+        zkClientB.setData(PARENT_PATH, DATA, -1);
+    }
+
+    @Test
+    public void testSetDataFail() throws Exception {
+        zkClient.create(PARENT_PATH, null, Ids.CREATOR_ALL_ACL, 
CreateMode.PERSISTENT, null);
+        long lastxid = zkLeader.getLastLoggedZxid();
+        try {
+            zkClientB.setData(PARENT_PATH, DATA, -1);
+        }
+        catch (KeeperException.NoAuthException e) {}
+
+        Assert.assertEquals("OutstandingRequests not decremented",
+                            0,
+                            connectedServer.getInProcess());
+
+        assertTransactionState("Transaction state on Leader after failed 
setData", lastxid);
+    }
+
+    @Test
+    public void testSetACLOK() throws Exception {
+        zkClient.create(PARENT_PATH, null, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT, null);
+        zkClientB.setACL(PARENT_PATH, Ids.READ_ACL_UNSAFE, -1);
+
+        Assert.assertEquals("OutstandingRequests not decremented",
+                            0,
+                            connectedServer.getInProcess());
+    }
+
+    @Test
+    public void testSetACLFail() throws Exception {
+        zkClient.create(PARENT_PATH, null, Ids.CREATOR_ALL_ACL, 
CreateMode.PERSISTENT, null);
+        long lastxid = zkLeader.getLastLoggedZxid();
+        try {
+            zkClientB.setACL(PARENT_PATH, Ids.READ_ACL_UNSAFE, -1);
+        }
+        catch (KeeperException.NoAuthException e) {}
+
+        Assert.assertEquals("OutstandingRequests not decremented",
+                            0,
+                            connectedServer.getInProcess());
+
+        assertTransactionState("Transaction state on Leader after failed 
setACL", lastxid);
+    }
+
+    @Test
+    public void testBadACL() throws Exception {
+        CountdownWatcher cw = new CountdownWatcher();
+        TestableZooKeeper zk = createClient(cw, getPeersMatching(serverState));
+        long lastxid;
+
+        cw.waitForConnected(CONNECTION_TIMEOUT);
+
+        lastxid = zkLeader.getLastLoggedZxid();
+
+        try {
+            zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL,
+                      CreateMode.PERSISTENT);
+            Assert.fail("Should have received an invalid acl error");
+        } catch (KeeperException.InvalidACLException e) {}
+
+        Assert.assertEquals("OutstandingRequests not decremented",
+                            0,
+                            connectedServer.getInProcess());
+
+        assertTransactionState("zxid after invalid ACL", lastxid);
+    }
+}
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
index a2b0615..83c5d43 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
@@ -287,6 +287,15 @@ public class QuorumBase extends ClientBase {
         return peers;
     }
 
+    public QuorumPeer getPeerByClientPort(int clientPort) {
+        for (QuorumPeer p : getPeerList()) {
+            if (p.getClientAddress().getPort() == clientPort) {
+                return p;
+            }
+        }
+        return null;
+    }
+
     public void setupServers() throws IOException {
         setupServer(1);
         setupServer(2);

Reply via email to