This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push: new e748c26 Add a new handler to poll endpoint directly new 330795f Merge pull request #340 from atoulme/add_polling_cache_handler e748c26 is described below commit e748c26e28a337a659d8a489d2c20b5cde96fc9d Author: Antoine Toulme <anto...@lunar-ocean.com> AuthorDate: Thu Aug 19 05:57:53 2021 -0700 Add a new handler to poll endpoint directly --- .../kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt | 30 ++++++++- .../tuweni/jsonrpc/methods/MethodsHandler.kt | 71 +++++++++++++++++++++- .../tuweni/jsonrpc/methods/MethodsHandlerTest.kt | 31 +++++++++- 3 files changed, 127 insertions(+), 5 deletions(-) diff --git a/eth/src/main/kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt b/eth/src/main/kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt index 5305818..20d5804 100644 --- a/eth/src/main/kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt +++ b/eth/src/main/kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt @@ -19,10 +19,38 @@ package org.apache.tuweni.eth import com.fasterxml.jackson.annotation.JsonIgnoreProperties import com.fasterxml.jackson.annotation.JsonProperty +/** + * JSONRPCRequest represents a JSON-RPC request to a JSON-RPC service. + */ @JsonIgnoreProperties(ignoreUnknown = true) data class JSONRPCRequest( @JsonProperty("id") val id: Int, @JsonProperty("method") val method: String, @JsonProperty("params") val params: Array<String>, @JsonProperty("jsonrpc") val jsonrpc: String = "2.0" -) +) { + + companion object { + /** + * Deserialize a string into a JSON-RPC request. + * The request is incomplete, as no id is set. + * + * The serialized form follows this formula: + * <method>|<params, joined by comma> + * + * Example: + * - eth_getBlockByNumber|latest,true + */ + fun deserialize(serialized: String): JSONRPCRequest { + val segments = serialized.split("|") + return JSONRPCRequest(id = 0, method = segments[0], params = segments[1].split(",").toTypedArray()) + } + } + + override fun equals(other: Any?) = other is JSONRPCRequest && this.method == other.method && params.contentEquals(other.params) + override fun hashCode() = 31 * method.hashCode() + params.contentHashCode() + + fun serializeRequest(): String { + return this.method + "|" + this.params.joinToString(",") + } +} diff --git a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt index b881d76..45a18c3 100644 --- a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt +++ b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt @@ -20,12 +20,18 @@ import com.netflix.concurrency.limits.limit.FixedLimit import com.netflix.concurrency.limits.limiter.SimpleLimiter import io.opentelemetry.api.metrics.LongCounter import io.opentelemetry.api.metrics.common.Labels +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import org.apache.tuweni.eth.JSONRPCRequest import org.apache.tuweni.eth.JSONRPCResponse import org.apache.tuweni.eth.methodNotEnabled import org.apache.tuweni.eth.methodNotFound import org.apache.tuweni.eth.tooManyRequests import org.apache.tuweni.kv.KeyValueStore +import org.slf4j.LoggerFactory +import kotlin.coroutines.CoroutineContext class MethodsRouter(val methodsMap: Map<String, suspend (JSONRPCRequest) -> JSONRPCResponse>) { @@ -123,7 +129,7 @@ class CachingHandler( if (!found) { return delegateHandler(request) } else { - val serializedRequest = serializeRequest(request) + val serializedRequest = request.serializeRequest() val response = cacheStore.get(serializedRequest) return if (response == null) { cacheMissCounter.add(1) @@ -138,8 +144,67 @@ class CachingHandler( } } } +} + +class CachingPollingHandler( + private val cachedRequests: List<JSONRPCRequest>, + private val pollPeriodMillis: Long, + private val cacheStore: KeyValueStore<JSONRPCRequest, JSONRPCResponse>, + private val cacheHitCounter: LongCounter, + private val cacheMissCounter: LongCounter, + override val coroutineContext: CoroutineContext = Dispatchers.Default, + private val delegateHandler: suspend (JSONRPCRequest) -> JSONRPCResponse, +) : CoroutineScope { + + companion object { + private val logger = LoggerFactory.getLogger(CachingPollingHandler::class.java) + } + + init { + poll() + } + + private fun poll() { + launch { + try { + var id = 1337 + for (cachedRequest in cachedRequests) { + val newResponse = delegateHandler(cachedRequest.copy(id = id)) + id++ + if (newResponse.error == null) { + cacheStore.put(cachedRequest, newResponse) + } else { + logger.warn("{}, got error:\n{}", cachedRequest, newResponse.error) + } + } + } catch (e: Exception) { + logger.error("Error polling JSON-RPC endpoint", e) + } + delay(pollPeriodMillis) + poll() + } + } - private fun serializeRequest(request: JSONRPCRequest): String { - return request.method + "|" + request.params.joinToString(",") + suspend fun handleRequest(request: JSONRPCRequest): JSONRPCResponse { + var found = false + if (cachedRequests.contains(request)) { + found = true + } + if (!found) { + return delegateHandler(request) + } else { + val response = cacheStore.get(request) + if (response == null) { + cacheMissCounter.add(1) + val newResponse = delegateHandler(request) + if (newResponse.error == null) { + cacheStore.put(request, newResponse) + } + return newResponse + } else { + cacheHitCounter.add(1) + return response.copy(id = request.id) + } + } } } diff --git a/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt b/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt index 8624a03..ef88102 100644 --- a/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt +++ b/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt @@ -176,7 +176,7 @@ class CachingHandlerTest { val meterSdk = SdkMeterProvider.builder().build() val meter = meterSdk.get("handler") val handler = CachingHandler(listOf("foo"), kv, meter.longCounterBuilder("foo").build(), meter.longCounterBuilder("bar").build()) { - if (it.params.size > 0) { + if (it.params.isNotEmpty()) { JSONRPCResponse(id = 1, error = JSONRPCError(1234, "")) } else { JSONRPCResponse(id = 1) @@ -193,3 +193,32 @@ class CachingHandlerTest { assertEquals(1, map.size) } } + +class CachingPollingHandlerTest { + + @Test + fun testCache() = runBlocking { + val map = HashMap<JSONRPCRequest, JSONRPCResponse>() + val kv = MapKeyValueStore.open(map) + val meterSdk = SdkMeterProvider.builder().build() + val meter = meterSdk.get("handler") + val handler = CachingPollingHandler(listOf(JSONRPCRequest(1, "foo", arrayOf())), 1000, kv, meter.longCounterBuilder("foo").build(), meter.longCounterBuilder("bar").build()) { + if (it.params.isNotEmpty()) { + JSONRPCResponse(id = 1, error = JSONRPCError(1234, "")) + } else { + JSONRPCResponse(id = 1) + } + } + delay(500) + assertEquals(1, map.size) + handler.handleRequest(JSONRPCRequest(id = 1, method = "foo", params = arrayOf())) + assertEquals(1, map.size) + handler.handleRequest(JSONRPCRequest(id = 1, method = "bar", params = arrayOf())) + assertEquals(1, map.size) + handler.handleRequest(JSONRPCRequest(id = 1, method = "foo", params = arrayOf())) + assertEquals(1, map.size) + val errorResp = handler.handleRequest(JSONRPCRequest(id = 1, method = "foo", params = arrayOf("bleh"))) + assertEquals(1, map.size) + assertNotNull(errorResp.error) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org For additional commands, e-mail: commits-h...@tuweni.apache.org