This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push: new 8af91d1 Add a limiter so we can throttle access new a4051cd Merge pull request #324 from atoulme/add_concurrent_requests_throttling 8af91d1 is described below commit 8af91d125bd01a4a7a184f20262d30a665856d43 Author: Antoine Toulme <anto...@lunar-ocean.com> AuthorDate: Thu Jul 29 14:55:38 2021 -0700 Add a limiter so we can throttle access --- dependency-versions.gradle | 1 + jsonrpc/build.gradle | 1 + .../tuweni/jsonrpc/methods/MethodsHandler.kt | 26 ++++++++++++++ .../tuweni/jsonrpc/methods/MethodsHandlerTest.kt | 42 ++++++++++++++++++++++ 4 files changed, 70 insertions(+) diff --git a/dependency-versions.gradle b/dependency-versions.gradle index 6f08571..a3841a9 100644 --- a/dependency-versions.gradle +++ b/dependency-versions.gradle @@ -24,6 +24,7 @@ dependencyManagement { dependency('com.google.guava:guava:27.0.1-jre') dependency('com.h2database:h2:1.4.197') dependency('com.jolbox:bonecp:0.8.0.RELEASE') + dependency('com.netflix.concurrency-limits:concurrency-limits-core:0.3.6') dependency('com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0') dependency('com.opentable.components:otj-pg-embedded:0.13.3') dependency('com.squareup.okhttp3:okhttp:3.12.0') diff --git a/jsonrpc/build.gradle b/jsonrpc/build.gradle index e2d2da2..e09379c 100644 --- a/jsonrpc/build.gradle +++ b/jsonrpc/build.gradle @@ -16,6 +16,7 @@ dependencies { implementation 'org.slf4j:slf4j-api' implementation 'com.fasterxml.jackson.core:jackson-databind' implementation "com.google.guava:guava" + implementation 'com.netflix.concurrency-limits:concurrency-limits-core' implementation "org.jetbrains.kotlin:kotlin-stdlib" implementation 'io.opentelemetry:opentelemetry-api-metrics' implementation 'io.opentelemetry:opentelemetry-sdk-metrics' diff --git a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt index 9dc92a0..3419b98 100644 --- a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt +++ b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt @@ -16,6 +16,8 @@ */ package org.apache.tuweni.jsonrpc.methods +import com.netflix.concurrency.limits.limit.FixedLimit +import com.netflix.concurrency.limits.limiter.SimpleLimiter import io.opentelemetry.api.metrics.LongCounter import io.opentelemetry.api.metrics.common.Labels import org.apache.tuweni.eth.JSONRPCError @@ -66,6 +68,30 @@ class MethodAllowListHandler(private val allowedMethods: List<String>, private v } } +class ThrottlingHandler(private val threshold: Int, private val delegateHandler: (JSONRPCRequest) -> JSONRPCResponse) { + + private val limiter: SimpleLimiter<Void> = SimpleLimiter + .newBuilder() + .limit(FixedLimit.of(threshold)) + .build() + + fun handleRequest(request: JSONRPCRequest): JSONRPCResponse { + val listener = limiter.acquire(null) + if (listener.isEmpty) { + return JSONRPCResponse(id = request.id, error = JSONRPCError(code = -32000, message = "Too many requests")) + } else { + try { + val response = delegateHandler(request) + listener.get().onSuccess() + return response + } catch (t: Throwable) { + listener.get().onDropped() + throw RuntimeException(t) + } + } + } +} + // TODO DelegateHandler - choose from a number of handlers to see which to delegate to. // TODO FilterHandler - filter incoming requests per allowlist // TODO CachingHandler - cache some incoming requests diff --git a/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt b/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt index c9778f8..9441a10 100644 --- a/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt +++ b/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt @@ -20,6 +20,9 @@ import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.IntervalMetricReader import io.opentelemetry.sdk.metrics.export.MetricProducer import io.opentelemetry.sdk.metrics.testing.InMemoryMetricExporter +import kotlinx.coroutines.async +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking import org.apache.tuweni.eth.JSONRPCError import org.apache.tuweni.eth.JSONRPCRequest import org.apache.tuweni.eth.JSONRPCResponse @@ -123,3 +126,42 @@ class MethodAllowListHandlerTest { assertEquals("Method not enabled", respContents.message) } } + +class ThrottlingHandlerTest { + + @Test + fun testThrottling(): Unit = runBlocking { + val handler = ThrottlingHandler(4) { + runBlocking { + delay(500) + JSONRPCResponse(id = 1) + } + } + async { + val response = handler.handleRequest(JSONRPCRequest(2, "foo", arrayOf())) + assertEquals(1, response.id) + } + async { + val response = handler.handleRequest(JSONRPCRequest(3, "foo", arrayOf())) + assertEquals(1, response.id) + } + async { + val response = handler.handleRequest(JSONRPCRequest(4, "foo", arrayOf())) + assertEquals(1, response.id) + } + async { + val response = handler.handleRequest(JSONRPCRequest(5, "foo", arrayOf())) + assertEquals(1, response.id) + } + async { + delay(200) + val response = handler.handleRequest(JSONRPCRequest(6, "foo", arrayOf())) + assertEquals(-32000, response.error?.code) + } + async { + delay(1000) + val response = handler.handleRequest(JSONRPCRequest(7, "foo", arrayOf())) + assertEquals(1, response.id) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org For additional commands, e-mail: commits-h...@tuweni.apache.org