This is an automated email from the ASF dual-hosted git repository.

hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ca627b  ZOOKEEPER-3503: Add server side large request throttling
1ca627b is described below

commit 1ca627b5a3105d80ed4d851c6e9f1a1e2ac7d64a
Author: Jie Huang <jiehu...@fb.com>
AuthorDate: Mon Sep 16 14:26:05 2019 -0700

    ZOOKEEPER-3503: Add server side large request throttling
    
    With this change, a ZooKeeper server has two new settings:
    zookeeper.largeRequestThreshold
    zookeeper.largeRequestMaxBytes
    
    Any request that is larger than largeRequestThreshold is considered a large 
request, and will only be allowed if the number of bytes associated with 
inflight large requests is currently below the largeRequestMaxBytes limit.
    
    This check is performed in the connection layer based on the length header 
of a request, before allocating the necessary byte buffer and reading data off 
the TCP socket. This ensures the limit is enforced before allocating data 
that's ultimately just going to discarded.
    
    Whenever a large request is allowed, its size is added to an atomic counter 
which represents the number of large request bytes inflight and this counter is 
the one tested against the max. Whenever a large request is completed or 
dropped, the counter is decremented as necessary.
    
    Author: Jie Huang <jiehu...@fb.com>
    Author: Joseph Blomstedt <j...@fb.com>
    
    Reviewers: Michael Han <h...@apache.org>, Norbert Kalmar 
<nkal...@apache.org>
    
    Closes #1051 from jhuan31/ZOOKEEPER-3503
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |  10 ++
 .../zookeeper/server/FinalRequestProcessor.java    |   3 +
 .../org/apache/zookeeper/server/NIOServerCnxn.java |   2 +
 .../apache/zookeeper/server/NettyServerCnxn.java   |   2 +
 .../java/org/apache/zookeeper/server/Request.java  |  10 ++
 .../apache/zookeeper/server/RequestThrottler.java  |   3 +
 .../org/apache/zookeeper/server/ServerMetrics.java |   2 +
 .../apache/zookeeper/server/ZooKeeperServer.java   | 122 +++++++++++++++++++++
 .../zookeeper/server/ZooKeeperServerBean.java      |  20 ++++
 .../zookeeper/server/ZooKeeperServerMXBean.java    |   6 +
 .../zookeeper/server/RequestThrottlerTest.java     |  69 ++++++++++++
 11 files changed, 249 insertions(+)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md 
b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 7c37a53..327ac56 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -957,6 +957,16 @@ property, when available, is noted below.
     consistency check during recovery process.
     Default value is false.
 
+* *largeRequestMaxBytes* :
+    (Java system property: **zookeeper.largeRequestMaxBytes**)
+    **New in 3.6.0:**
+    The maximum number of bytes of all inflight large request. The connection 
will be closed if a coming large request causes the limit exceeded. The default 
is 100 * 1024 * 1024.
+
+* *largeRequestThreshold* :
+    (Java system property: **zookeeper.largeRequestThreshold**)
+    **New in 3.6.0:**
+    The size threshold after which a request is considered a large request. If 
it is -1, then all requests are considered small, effectively turning off large 
request throttling. The default is -1.
+
 <a name="sc_clusterOptions"></a>
 
 #### Cluster Options
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index faccc36..dcb3d26 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -152,7 +152,10 @@ public class FinalRequestProcessor implements 
RequestProcessor {
         long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
 
         String lastOp = "NA";
+        // Notify ZooKeeperServer that the request has finished so that it can
+        // update any request accounting/throttling limits
         zks.decInProcess();
+        zks.requestFinished(request);
         Code err = Code.OK;
         Record rsp = null;
         String path = null;
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index 9c0af0e..fc69a52 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -544,6 +544,8 @@ public class NIOServerCnxn extends ServerCnxn {
         if (!isZKServerRunning()) {
             throw new IOException("ZooKeeperServer not running");
         }
+        // checkRequestSize will throw IOException if request is rejected
+        zkServer.checkRequestSizeWhenReceivingMessage(len);
         incomingBuffer = ByteBuffer.allocate(len);
         return true;
     }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index aeb9b25..ac03190 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -507,6 +507,8 @@ public class NettyServerCnxn extends ServerCnxn {
                         if (len < 0 || len > BinaryInputArchive.maxBuffer) {
                             throw new IOException("Len error " + len);
                         }
+                        // checkRequestSize will throw IOException if request 
is rejected
+                        zkServer.checkRequestSizeWhenReceivingMessage(len);
                         bb = ByteBuffer.allocate(len);
                     }
                 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index f0d6109..295e983 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -109,6 +109,8 @@ public class Request {
      */
     private boolean isLocalSession = false;
 
+    private int largeRequestSize = -1;
+
     public boolean isLocalSession() {
         return isLocalSession;
     }
@@ -117,6 +119,14 @@ public class Request {
         this.isLocalSession = isLocalSession;
     }
 
+    public void setLargeRequestSize(int size) {
+        largeRequestSize = size;
+    }
+
+    public int getLargeRequestSize() {
+        return largeRequestSize;
+    }
+
     public Object getOwner() {
         return owner;
     }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
index a2aea44..880ac49 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
@@ -219,6 +219,9 @@ public class RequestThrottler extends 
ZooKeeperCriticalThread {
             // Note: this will close the connection
             conn.setInvalid();
         }
+        // Notify ZooKeeperServer that the request has finished so that it can
+        // update any request accounting/throttling limits.
+        zks.requestFinished(request);
     }
 
     public void submitRequest(Request request) {
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 93f5ee4..1f9855c 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -223,6 +223,7 @@ public final class ServerMetrics {
         STALE_REQUESTS_DROPPED = 
metricsContext.getCounter("stale_requests_dropped");
         STALE_REPLIES = metricsContext.getCounter("stale_replies");
         REQUEST_THROTTLE_WAIT_COUNT = 
metricsContext.getCounter("request_throttle_wait_count");
+        LARGE_REQUESTS_REJECTED = 
metricsContext.getCounter("large_requests_rejected");
 
         NETTY_QUEUED_BUFFER = 
metricsContext.getSummary("netty_queued_buffer_capacity", DetailLevel.BASIC);
 
@@ -425,6 +426,7 @@ public final class ServerMetrics {
     public final Counter STALE_REQUESTS_DROPPED;
     public final Counter STALE_REPLIES;
     public final Counter REQUEST_THROTTLE_WAIT_COUNT;
+    public final Counter LARGE_REQUESTS_REJECTED;
 
     public final Summary NETTY_QUEUED_BUFFER;
 
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index e982cd1..3d6c375 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -244,6 +244,35 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
     private RequestThrottler requestThrottler;
     public static final String SNAP_COUNT = "zookeeper.snapCount";
 
+    /**
+     * This setting sets a limit on the total number of large requests that
+     * can be inflight and is designed to prevent ZooKeeper from accepting
+     * too many large requests such that the JVM runs out of usable heap and
+     * ultimately crashes.
+     *
+     * The limit is enforced by the {@link checkRequestSize(int, boolean)}
+     * method which is called by the connection layer ({@link NIOServerCnxn},
+     * {@link NettyServerCnxn}) before allocating a byte buffer and pulling
+     * data off the TCP socket. The limit is then checked again by the
+     * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which
+     * also atomically updates {@link currentLargeRequestBytes}. The request is
+     * then marked as a large request, with the request size stored in the 
Request
+     * object so that it can later be decremented from {@link 
currentLargeRequestsBytes}.
+     *
+     * When a request is completed or dropped, the relevant code path calls the
+     * {@link requestFinished(Request)} method which performs the decrement if
+     * needed.
+     */
+    private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
+
+    /**
+     * The size threshold after which a request is considered a large request
+     * and is checked against the large request byte limit.
+     */
+    private volatile int largeRequestThreshold = -1;
+
+    private final AtomicInteger currentLargeRequestBytes = new 
AtomicInteger(0);
+
     void removeCnxn(ServerCnxn cnxn) {
         zkDb.removeCnxn(cnxn);
     }
@@ -285,6 +314,8 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
 
         this.requestPathMetricsCollector = new RequestPathMetricsCollector();
 
+        this.initLargeRequestThrottlingSettings();
+
         LOG.info("Created server with tickTime " + tickTime
                  + " minSessionTimeout " + getMinSessionTimeout()
                  + " maxSessionTimeout " + getMaxSessionTimeout()
@@ -1047,14 +1078,20 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
                 }
             } else {
                 LOG.warn("Received packet at server of unknown type " + 
si.type);
+                // Update request accounting/throttling limits
+                requestFinished(si);
                 new UnimplementedRequestProcessor().processRequest(si);
             }
         } catch (MissingSessionException e) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Dropping request: " + e.getMessage());
             }
+            // Update request accounting/throttling limits
+            requestFinished(si);
         } catch (RequestProcessorException e) {
             LOG.error("Unable to process request:" + e.getMessage(), e);
+            // Update request accounting/throttling limits
+            requestFinished(si);
         }
     }
 
@@ -1380,6 +1417,85 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
         maxBatchSize = size;
     }
 
+    private void initLargeRequestThrottlingSettings() {
+        
setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", 
largeRequestMaxBytes));
+        
setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", 
-1));
+    }
+
+    public int getLargeRequestMaxBytes() {
+        return largeRequestMaxBytes;
+    }
+
+    public void setLargeRequestMaxBytes(int bytes) {
+        if (bytes <= 0) {
+            LOG.warn("Invalid max bytes for all large requests {}. It should 
be a positive number.", bytes);
+            LOG.warn("Will not change the setting. The max bytes stay at {}", 
largeRequestMaxBytes);
+        } else {
+            largeRequestMaxBytes = bytes;
+            LOG.info("The max bytes for all large requests are set to {}", 
largeRequestMaxBytes);
+        }
+    }
+
+    public int getLargeRequestThreshold() {
+        return largeRequestThreshold;
+    }
+
+    public void setLargeRequestThreshold(int threshold) {
+        if (threshold == 0 || threshold < -1) {
+            LOG.warn("Invalid large request threshold {}. It should be -1 or 
positive. Setting to -1 ", threshold);
+            largeRequestThreshold = -1;
+        } else {
+            largeRequestThreshold = threshold;
+            LOG.info("The large request threshold is set to {}", 
largeRequestThreshold);
+        }
+    }
+
+    public int getLargeRequestBytes() {
+        return currentLargeRequestBytes.get();
+    }
+
+    private boolean isLargeRequest(int length) {
+        // The large request limit is disabled when threshold is -1
+        if (largeRequestThreshold == -1) {
+            return false;
+        }
+        return length > largeRequestThreshold;
+    }
+
+    public boolean checkRequestSizeWhenReceivingMessage(int length) throws 
IOException {
+        if (!isLargeRequest(length)) {
+            return true;
+        }
+        if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) {
+            return true;
+        } else {
+            ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
+            throw new IOException("Rejecting large request");
+        }
+
+    }
+
+    private boolean checkRequestSizeWhenMessageReceived(int length) throws 
IOException {
+        if (!isLargeRequest(length)) {
+            return true;
+        }
+
+        int bytes = currentLargeRequestBytes.addAndGet(length);
+        if (bytes > largeRequestMaxBytes) {
+            currentLargeRequestBytes.addAndGet(-length);
+            ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
+            throw new IOException("Rejecting large request");
+        }
+        return true;
+    }
+
+    public void requestFinished(Request request) {
+        int largeRequestLength = request.getLargeRequestSize();
+        if (largeRequestLength != -1) {
+            currentLargeRequestBytes.addAndGet(-largeRequestLength);
+        }
+    }
+
     public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) 
throws IOException {
         // We have the request, now process and setup for next
         InputStream bais = new ByteBufferInputStream(incomingBuffer);
@@ -1451,6 +1567,12 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
                 cnxn.disableRecv();
             } else {
                 Request si = new Request(cnxn, cnxn.getSessionId(), 
h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
+                int length = incomingBuffer.limit();
+                if (isLargeRequest(length)) {
+                    // checkRequestSize will throw IOException if request is 
rejected
+                    checkRequestSizeWhenMessageReceived(length);
+                    si.setLargeRequestSize(length);
+                }
                 si.setOwner(ServerCnxn.me);
                 // Always treat packet from the client as a possible
                 // local request.
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
index 08d4291..1783197 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
@@ -383,4 +383,24 @@ public class ZooKeeperServerBean implements 
ZooKeeperServerMXBean, ZKMBeanInfo {
         Request.setStaleConnectionCheck(check);
     }
 
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public int getLargeRequestMaxBytes() {
+        return zks.getLargeRequestMaxBytes();
+    }
+
+    public void setLargeRequestMaxBytes(int bytes) {
+        zks.setLargeRequestMaxBytes(bytes);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public int getLargeRequestThreshold() {
+        return zks.getLargeRequestThreshold();
+    }
+
+    public void setLargeRequestThreshold(int threshold) {
+        zks.setLargeRequestThreshold(threshold);
+    }
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
index ac15768..cf46675 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
@@ -142,6 +142,12 @@ public interface ZooKeeperServerMXBean {
     boolean getRequestStaleConnectionCheck();
     void setRequestStaleConnectionCheck(boolean check);
 
+    int getLargeRequestMaxBytes();
+    void setLargeRequestMaxBytes(int bytes);
+
+    int getLargeRequestThreshold();
+    void setLargeRequestThreshold(int threshold);
+
     /**
      * Reset packet and latency statistics
      */
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
index db40d86..3afe81c 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
@@ -26,7 +26,9 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooDefs;
@@ -34,6 +36,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -59,9 +62,16 @@ public class RequestThrottlerTest extends ZKTestCase {
     // latch to make sure all requests entered the pipeline
     CountDownLatch entered = null;
 
+    // latch to make sure requests finished the pipeline
+    CountDownLatch finished = null;
+
+    CountDownLatch disconnected = null;
+
     ZooKeeperServer zks = null;
     ServerCnxnFactory f = null;
     ZooKeeper zk = null;
+    int connectionLossCount = 0;
+
 
     @Before
     public void setup() throws Exception {
@@ -122,6 +132,13 @@ public class RequestThrottlerTest extends ZKTestCase {
             super.submitRequest(si);
         }
 
+        @Override
+        public void requestFinished(Request request) {
+            if (null != finished){
+                finished.countDown();
+            }
+            super.requestFinished(request);
+        }
     }
 
     class TestPrepRequestProcessor extends PrepRequestProcessor {
@@ -238,4 +255,56 @@ public class RequestThrottlerTest extends ZKTestCase {
         assertEquals(1, (long) metrics.get("stale_requests_dropped"));
     }
 
+    @Test
+    public void testLargeRequestThrottling() throws Exception {
+        ServerMetrics.getMetrics().resetAll();
+
+        AsyncCallback.StringCallback createCallback = (rc, path, ctx, name) -> 
{
+            if (KeeperException.Code.get(rc) == 
KeeperException.Code.CONNECTIONLOSS) {
+                disconnected.countDown();
+                connectionLossCount++;
+            }
+        };
+
+        // we allow five requests in the pipeline
+        RequestThrottler.setMaxRequests(5);
+
+        // enable large request throttling
+        zks.setLargeRequestThreshold(150);
+        zks.setLargeRequestMaxBytes(400);
+
+        // no requests can go through the pipeline unless we raise the latch
+        resumeProcess = new CountDownLatch(1);
+        // the connection will be close when large requests exceed the limit
+        // we can't use the submitted latch because requests after close won't 
be submitted
+        disconnected = new CountDownLatch(TOTAL_REQUESTS);
+
+        // the total length of the request is about 170-180 bytes, so only two 
requests are allowed
+        byte[] data = new byte[100];
+
+        // send 5 requests asynchronously
+        for (int i = 0; i < TOTAL_REQUESTS; i++) {
+            zk.create("/request_throttle_test- " + i , data,
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, 
createCallback, null);
+        }
+
+        // make sure the server received all 5 requests
+        disconnected.await(5, TimeUnit.SECONDS);
+        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+
+        // but only two requests can get into the pipeline because they are 
large requests
+        // the connection will be closed
+        Assert.assertEquals(2L, (long) 
metrics.get("prep_processor_request_queued"));
+        Assert.assertEquals(1L, (long) metrics.get("large_requests_rejected"));
+        Assert.assertEquals(5, connectionLossCount);
+
+        finished = new CountDownLatch(2);
+        // let the requests go through the pipeline
+        resumeProcess.countDown();
+        finished.await(5, TimeUnit.SECONDS);
+
+        // when the two requests finish, they are stale because the connection 
is closed already
+        metrics = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(2, (long) metrics.get("stale_replies"));
+    }
 }

Reply via email to