This is an automated email from the ASF dual-hosted git repository. hanm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push: new 1ca627b ZOOKEEPER-3503: Add server side large request throttling 1ca627b is described below commit 1ca627b5a3105d80ed4d851c6e9f1a1e2ac7d64a Author: Jie Huang <jiehu...@fb.com> AuthorDate: Mon Sep 16 14:26:05 2019 -0700 ZOOKEEPER-3503: Add server side large request throttling With this change, a ZooKeeper server has two new settings: zookeeper.largeRequestThreshold zookeeper.largeRequestMaxBytes Any request that is larger than largeRequestThreshold is considered a large request, and will only be allowed if the number of bytes associated with inflight large requests is currently below the largeRequestMaxBytes limit. This check is performed in the connection layer based on the length header of a request, before allocating the necessary byte buffer and reading data off the TCP socket. This ensures the limit is enforced before allocating data that's ultimately just going to discarded. Whenever a large request is allowed, its size is added to an atomic counter which represents the number of large request bytes inflight and this counter is the one tested against the max. Whenever a large request is completed or dropped, the counter is decremented as necessary. Author: Jie Huang <jiehu...@fb.com> Author: Joseph Blomstedt <j...@fb.com> Reviewers: Michael Han <h...@apache.org>, Norbert Kalmar <nkal...@apache.org> Closes #1051 from jhuan31/ZOOKEEPER-3503 --- .../src/main/resources/markdown/zookeeperAdmin.md | 10 ++ .../zookeeper/server/FinalRequestProcessor.java | 3 + .../org/apache/zookeeper/server/NIOServerCnxn.java | 2 + .../apache/zookeeper/server/NettyServerCnxn.java | 2 + .../java/org/apache/zookeeper/server/Request.java | 10 ++ .../apache/zookeeper/server/RequestThrottler.java | 3 + .../org/apache/zookeeper/server/ServerMetrics.java | 2 + .../apache/zookeeper/server/ZooKeeperServer.java | 122 +++++++++++++++++++++ .../zookeeper/server/ZooKeeperServerBean.java | 20 ++++ .../zookeeper/server/ZooKeeperServerMXBean.java | 6 + .../zookeeper/server/RequestThrottlerTest.java | 69 ++++++++++++ 11 files changed, 249 insertions(+) diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index 7c37a53..327ac56 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -957,6 +957,16 @@ property, when available, is noted below. consistency check during recovery process. Default value is false. +* *largeRequestMaxBytes* : + (Java system property: **zookeeper.largeRequestMaxBytes**) + **New in 3.6.0:** + The maximum number of bytes of all inflight large request. The connection will be closed if a coming large request causes the limit exceeded. The default is 100 * 1024 * 1024. + +* *largeRequestThreshold* : + (Java system property: **zookeeper.largeRequestThreshold**) + **New in 3.6.0:** + The size threshold after which a request is considered a large request. If it is -1, then all requests are considered small, effectively turning off large request throttling. The default is -1. + <a name="sc_clusterOptions"></a> #### Cluster Options diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java index faccc36..dcb3d26 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java @@ -152,7 +152,10 @@ public class FinalRequestProcessor implements RequestProcessor { long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); String lastOp = "NA"; + // Notify ZooKeeperServer that the request has finished so that it can + // update any request accounting/throttling limits zks.decInProcess(); + zks.requestFinished(request); Code err = Code.OK; Record rsp = null; String path = null; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index 9c0af0e..fc69a52 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -544,6 +544,8 @@ public class NIOServerCnxn extends ServerCnxn { if (!isZKServerRunning()) { throw new IOException("ZooKeeperServer not running"); } + // checkRequestSize will throw IOException if request is rejected + zkServer.checkRequestSizeWhenReceivingMessage(len); incomingBuffer = ByteBuffer.allocate(len); return true; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index aeb9b25..ac03190 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -507,6 +507,8 @@ public class NettyServerCnxn extends ServerCnxn { if (len < 0 || len > BinaryInputArchive.maxBuffer) { throw new IOException("Len error " + len); } + // checkRequestSize will throw IOException if request is rejected + zkServer.checkRequestSizeWhenReceivingMessage(len); bb = ByteBuffer.allocate(len); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java index f0d6109..295e983 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java @@ -109,6 +109,8 @@ public class Request { */ private boolean isLocalSession = false; + private int largeRequestSize = -1; + public boolean isLocalSession() { return isLocalSession; } @@ -117,6 +119,14 @@ public class Request { this.isLocalSession = isLocalSession; } + public void setLargeRequestSize(int size) { + largeRequestSize = size; + } + + public int getLargeRequestSize() { + return largeRequestSize; + } + public Object getOwner() { return owner; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java index a2aea44..880ac49 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java @@ -219,6 +219,9 @@ public class RequestThrottler extends ZooKeeperCriticalThread { // Note: this will close the connection conn.setInvalid(); } + // Notify ZooKeeperServer that the request has finished so that it can + // update any request accounting/throttling limits. + zks.requestFinished(request); } public void submitRequest(Request request) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java index 93f5ee4..1f9855c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java @@ -223,6 +223,7 @@ public final class ServerMetrics { STALE_REQUESTS_DROPPED = metricsContext.getCounter("stale_requests_dropped"); STALE_REPLIES = metricsContext.getCounter("stale_replies"); REQUEST_THROTTLE_WAIT_COUNT = metricsContext.getCounter("request_throttle_wait_count"); + LARGE_REQUESTS_REJECTED = metricsContext.getCounter("large_requests_rejected"); NETTY_QUEUED_BUFFER = metricsContext.getSummary("netty_queued_buffer_capacity", DetailLevel.BASIC); @@ -425,6 +426,7 @@ public final class ServerMetrics { public final Counter STALE_REQUESTS_DROPPED; public final Counter STALE_REPLIES; public final Counter REQUEST_THROTTLE_WAIT_COUNT; + public final Counter LARGE_REQUESTS_REJECTED; public final Summary NETTY_QUEUED_BUFFER; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index e982cd1..3d6c375 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -244,6 +244,35 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { private RequestThrottler requestThrottler; public static final String SNAP_COUNT = "zookeeper.snapCount"; + /** + * This setting sets a limit on the total number of large requests that + * can be inflight and is designed to prevent ZooKeeper from accepting + * too many large requests such that the JVM runs out of usable heap and + * ultimately crashes. + * + * The limit is enforced by the {@link checkRequestSize(int, boolean)} + * method which is called by the connection layer ({@link NIOServerCnxn}, + * {@link NettyServerCnxn}) before allocating a byte buffer and pulling + * data off the TCP socket. The limit is then checked again by the + * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which + * also atomically updates {@link currentLargeRequestBytes}. The request is + * then marked as a large request, with the request size stored in the Request + * object so that it can later be decremented from {@link currentLargeRequestsBytes}. + * + * When a request is completed or dropped, the relevant code path calls the + * {@link requestFinished(Request)} method which performs the decrement if + * needed. + */ + private volatile int largeRequestMaxBytes = 100 * 1024 * 1024; + + /** + * The size threshold after which a request is considered a large request + * and is checked against the large request byte limit. + */ + private volatile int largeRequestThreshold = -1; + + private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0); + void removeCnxn(ServerCnxn cnxn) { zkDb.removeCnxn(cnxn); } @@ -285,6 +314,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { this.requestPathMetricsCollector = new RequestPathMetricsCollector(); + this.initLargeRequestThrottlingSettings(); + LOG.info("Created server with tickTime " + tickTime + " minSessionTimeout " + getMinSessionTimeout() + " maxSessionTimeout " + getMaxSessionTimeout() @@ -1047,14 +1078,20 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } } else { LOG.warn("Received packet at server of unknown type " + si.type); + // Update request accounting/throttling limits + requestFinished(si); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } + // Update request accounting/throttling limits + requestFinished(si); } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); + // Update request accounting/throttling limits + requestFinished(si); } } @@ -1380,6 +1417,85 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { maxBatchSize = size; } + private void initLargeRequestThrottlingSettings() { + setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes)); + setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1)); + } + + public int getLargeRequestMaxBytes() { + return largeRequestMaxBytes; + } + + public void setLargeRequestMaxBytes(int bytes) { + if (bytes <= 0) { + LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes); + LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes); + } else { + largeRequestMaxBytes = bytes; + LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes); + } + } + + public int getLargeRequestThreshold() { + return largeRequestThreshold; + } + + public void setLargeRequestThreshold(int threshold) { + if (threshold == 0 || threshold < -1) { + LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold); + largeRequestThreshold = -1; + } else { + largeRequestThreshold = threshold; + LOG.info("The large request threshold is set to {}", largeRequestThreshold); + } + } + + public int getLargeRequestBytes() { + return currentLargeRequestBytes.get(); + } + + private boolean isLargeRequest(int length) { + // The large request limit is disabled when threshold is -1 + if (largeRequestThreshold == -1) { + return false; + } + return length > largeRequestThreshold; + } + + public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException { + if (!isLargeRequest(length)) { + return true; + } + if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) { + return true; + } else { + ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); + throw new IOException("Rejecting large request"); + } + + } + + private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException { + if (!isLargeRequest(length)) { + return true; + } + + int bytes = currentLargeRequestBytes.addAndGet(length); + if (bytes > largeRequestMaxBytes) { + currentLargeRequestBytes.addAndGet(-length); + ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); + throw new IOException("Rejecting large request"); + } + return true; + } + + public void requestFinished(Request request) { + int largeRequestLength = request.getLargeRequestSize(); + if (largeRequestLength != -1) { + currentLargeRequestBytes.addAndGet(-largeRequestLength); + } + } + public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { // We have the request, now process and setup for next InputStream bais = new ByteBufferInputStream(incomingBuffer); @@ -1451,6 +1567,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { cnxn.disableRecv(); } else { Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); + int length = incomingBuffer.limit(); + if (isLargeRequest(length)) { + // checkRequestSize will throw IOException if request is rejected + checkRequestSizeWhenMessageReceived(length); + si.setLargeRequestSize(length); + } si.setOwner(ServerCnxn.me); // Always treat packet from the client as a possible // local request. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java index 08d4291..1783197 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java @@ -383,4 +383,24 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo { Request.setStaleConnectionCheck(check); } + + /////////////////////////////////////////////////////////////////////////// + + public int getLargeRequestMaxBytes() { + return zks.getLargeRequestMaxBytes(); + } + + public void setLargeRequestMaxBytes(int bytes) { + zks.setLargeRequestMaxBytes(bytes); + } + + /////////////////////////////////////////////////////////////////////////// + + public int getLargeRequestThreshold() { + return zks.getLargeRequestThreshold(); + } + + public void setLargeRequestThreshold(int threshold) { + zks.setLargeRequestThreshold(threshold); + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java index ac15768..cf46675 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java @@ -142,6 +142,12 @@ public interface ZooKeeperServerMXBean { boolean getRequestStaleConnectionCheck(); void setRequestStaleConnectionCheck(boolean check); + int getLargeRequestMaxBytes(); + void setLargeRequestMaxBytes(int bytes); + + int getLargeRequestThreshold(); + void setLargeRequestThreshold(int threshold); + /** * Reset packet and latency statistics */ diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java index db40d86..3afe81c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java @@ -26,7 +26,9 @@ import java.io.IOException; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooDefs; @@ -34,6 +36,7 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.test.ClientBase; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -59,9 +62,16 @@ public class RequestThrottlerTest extends ZKTestCase { // latch to make sure all requests entered the pipeline CountDownLatch entered = null; + // latch to make sure requests finished the pipeline + CountDownLatch finished = null; + + CountDownLatch disconnected = null; + ZooKeeperServer zks = null; ServerCnxnFactory f = null; ZooKeeper zk = null; + int connectionLossCount = 0; + @Before public void setup() throws Exception { @@ -122,6 +132,13 @@ public class RequestThrottlerTest extends ZKTestCase { super.submitRequest(si); } + @Override + public void requestFinished(Request request) { + if (null != finished){ + finished.countDown(); + } + super.requestFinished(request); + } } class TestPrepRequestProcessor extends PrepRequestProcessor { @@ -238,4 +255,56 @@ public class RequestThrottlerTest extends ZKTestCase { assertEquals(1, (long) metrics.get("stale_requests_dropped")); } + @Test + public void testLargeRequestThrottling() throws Exception { + ServerMetrics.getMetrics().resetAll(); + + AsyncCallback.StringCallback createCallback = (rc, path, ctx, name) -> { + if (KeeperException.Code.get(rc) == KeeperException.Code.CONNECTIONLOSS) { + disconnected.countDown(); + connectionLossCount++; + } + }; + + // we allow five requests in the pipeline + RequestThrottler.setMaxRequests(5); + + // enable large request throttling + zks.setLargeRequestThreshold(150); + zks.setLargeRequestMaxBytes(400); + + // no requests can go through the pipeline unless we raise the latch + resumeProcess = new CountDownLatch(1); + // the connection will be close when large requests exceed the limit + // we can't use the submitted latch because requests after close won't be submitted + disconnected = new CountDownLatch(TOTAL_REQUESTS); + + // the total length of the request is about 170-180 bytes, so only two requests are allowed + byte[] data = new byte[100]; + + // send 5 requests asynchronously + for (int i = 0; i < TOTAL_REQUESTS; i++) { + zk.create("/request_throttle_test- " + i , data, + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createCallback, null); + } + + // make sure the server received all 5 requests + disconnected.await(5, TimeUnit.SECONDS); + Map<String, Object> metrics = MetricsUtils.currentServerMetrics(); + + // but only two requests can get into the pipeline because they are large requests + // the connection will be closed + Assert.assertEquals(2L, (long) metrics.get("prep_processor_request_queued")); + Assert.assertEquals(1L, (long) metrics.get("large_requests_rejected")); + Assert.assertEquals(5, connectionLossCount); + + finished = new CountDownLatch(2); + // let the requests go through the pipeline + resumeProcess.countDown(); + finished.await(5, TimeUnit.SECONDS); + + // when the two requests finish, they are stale because the connection is closed already + metrics = MetricsUtils.currentServerMetrics(); + Assert.assertEquals(2, (long) metrics.get("stale_replies")); + } }