This is an automated email from the ASF dual-hosted git repository.
adutra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 1071aa24 Refactor RateLimiter (#534)
1071aa24 is described below
commit 1071aa24d2c2bad3ff0f9840a6b2572bc29e4e6a
Author: Alexandre Dutra <[email protected]>
AuthorDate: Tue Dec 17 10:56:02 2024 +0100
Refactor RateLimiter (#534)
This PR is motivated by the following issues:
* `RateLimiter` implementations are typically request-scoped, because they
are sensitive to the realm context,
but keeping a cache of token buckets per realm must be delegated to a
separate, application-scoped bean: this
is solved by introducing a new `TokenBucketFactory` bean.
* `TokenBucketRateLimiter` should NOT implement `RateLimiter`, because that
would result in two available beans
for this interface: the one selected by configuration
(`realm-token-bucket` or `no-op`), and this one which
is always available, thus triggering an unresolvable bean error when
using the Quarkus runtime. Instead,
this class is now just a general-purpose Token Bucket implementation.
This PR has one user-facing change:
* The configuration options for the `realm-token-bucket` rate limiter were
moved from the `rateLimiter` section to
the `tokenBucketFactory` section.
---
.../config/PolarisApplicationConfig.java | 16 ++++++
.../main/resources/META-INF/hk2-locator/default | 10 ++--
...ateLimiter.java => MockTokenBucketFactory.java} | 22 +++-----
.../ratelimiter/RateLimiterFilterTest.java | 9 ++--
.../RealmTokenBucketRateLimiterTest.java | 28 ++++++----
.../ratelimiter/TokenBucketRateLimiterTest.java | 34 ++++++++-----
.../test/resources/META-INF/hk2-locator/default | 6 +--
.../resources/polaris-server-integrationtest.yml | 6 ++-
polaris-server.yml | 8 +++
.../ratelimiter/DefaultTokenBucketFactory.java | 59 ++++++++++++++++++++++
.../service/ratelimiter/NoOpRateLimiter.java | 2 +-
.../polaris/service/ratelimiter/RateLimiter.java | 2 +-
.../service/ratelimiter/RateLimiterFilter.java | 2 +-
.../ratelimiter/RealmTokenBucketRateLimiter.java | 47 ++++-------------
...okenBucketRateLimiter.java => TokenBucket.java} | 11 ++--
.../service/ratelimiter/TokenBucketFactory.java | 26 ++--------
16 files changed, 170 insertions(+), 118 deletions(-)
diff --git
a/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/config/PolarisApplicationConfig.java
b/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/config/PolarisApplicationConfig.java
index 34623ee7..e5f94e1c 100644
---
a/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/config/PolarisApplicationConfig.java
+++
b/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/config/PolarisApplicationConfig.java
@@ -52,6 +52,7 @@ import
org.apache.polaris.service.config.TaskHandlerConfiguration;
import org.apache.polaris.service.context.CallContextResolver;
import org.apache.polaris.service.context.RealmContextResolver;
import org.apache.polaris.service.ratelimiter.RateLimiter;
+import org.apache.polaris.service.ratelimiter.TokenBucketFactory;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.types.TokenType;
import org.glassfish.hk2.api.Factory;
@@ -90,6 +91,7 @@ public class PolarisApplicationConfig extends Configuration {
private String awsSecretKey;
private FileIOFactory fileIOFactory;
private RateLimiter rateLimiter;
+ private TokenBucketFactory tokenBucketFactory;
private TokenBrokerFactory tokenBrokerFactory;
private AccessToken gcpAccessToken;
@@ -144,6 +146,9 @@ public class PolarisApplicationConfig extends Configuration
{
bindFactory(SupplierFactory.create(serviceLocator,
config::getRateLimiter))
.to(RateLimiter.class)
.ranked(OVERRIDE_BINDING_RANK);
+ bindFactory(SupplierFactory.create(serviceLocator,
config::getTokenBucketFactory))
+ .to(TokenBucketFactory.class)
+ .ranked(OVERRIDE_BINDING_RANK);
}
};
}
@@ -332,6 +337,17 @@ public class PolarisApplicationConfig extends
Configuration {
this.rateLimiter = rateLimiter;
}
+ @JsonProperty("tokenBucketFactory")
+ private TokenBucketFactory getTokenBucketFactory() {
+ return tokenBucketFactory;
+ }
+
+ @JsonProperty("tokenBucketFactory")
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
JsonTypeInfo.As.PROPERTY, property = "type")
+ public void setTokenBucketFactory(@Nullable TokenBucketFactory
tokenBucketFactory) {
+ this.tokenBucketFactory = tokenBucketFactory;
+ }
+
public void setTaskHandler(TaskHandlerConfiguration taskHandler) {
this.taskHandler = taskHandler;
}
diff --git a/dropwizard/service/src/main/resources/META-INF/hk2-locator/default
b/dropwizard/service/src/main/resources/META-INF/hk2-locator/default
index 5c935985..1771a6ad 100644
--- a/dropwizard/service/src/main/resources/META-INF/hk2-locator/default
+++ b/dropwizard/service/src/main/resources/META-INF/hk2-locator/default
@@ -81,13 +81,13 @@
contract={org.apache.polaris.service.ratelimiter.RateLimiter}
name=no-op
qualifier={io.smallrye.common.annotation.Identifier}
-[org.apache.polaris.service.ratelimiter.TokenBucketRateLimiter]S
-contract={org.apache.polaris.service.ratelimiter.RateLimiter}
-name=token-bucket
-qualifier={io.smallrye.common.annotation.Identifier}
-
[org.apache.polaris.service.ratelimiter.RealmTokenBucketRateLimiter]S
contract={org.apache.polaris.service.ratelimiter.RateLimiter}
name=realm-token-bucket
qualifier={io.smallrye.common.annotation.Identifier}
+[org.apache.polaris.service.ratelimiter.DefaultTokenBucketFactory]S
+contract={org.apache.polaris.service.ratelimiter.TokenBucketFactory}
+name=default
+qualifier={io.smallrye.common.annotation.Identifier}
+
diff --git
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/MockRealmTokenBucketRateLimiter.java
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/MockTokenBucketFactory.java
similarity index 68%
rename from
dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/MockRealmTokenBucketRateLimiter.java
rename to
dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/MockTokenBucketFactory.java
index 6bd4ad5c..6516e7c7 100644
---
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/MockRealmTokenBucketRateLimiter.java
+++
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/MockTokenBucketFactory.java
@@ -21,26 +21,20 @@ package org.apache.polaris.service.dropwizard.ratelimiter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.smallrye.common.annotation.Identifier;
-import java.time.Clock;
import java.time.Instant;
import java.time.ZoneOffset;
-import org.apache.polaris.service.ratelimiter.RealmTokenBucketRateLimiter;
+import org.apache.polaris.service.ratelimiter.DefaultTokenBucketFactory;
import org.threeten.extra.MutableClock;
-/** RealmTokenBucketRateLimiter with a mock clock */
-@Identifier("mock-realm-token-bucket")
-public class MockRealmTokenBucketRateLimiter extends
RealmTokenBucketRateLimiter {
+/** TokenBucketFactory with a mock clock */
+@Identifier("mock")
+public class MockTokenBucketFactory extends DefaultTokenBucketFactory {
public static MutableClock CLOCK = MutableClock.of(Instant.now(),
ZoneOffset.UTC);
@JsonCreator
- public MockRealmTokenBucketRateLimiter(
- @JsonProperty("requestsPerSecond") final long requestsPerSecond,
- @JsonProperty("windowSeconds") final long windowSeconds) {
- super(requestsPerSecond, windowSeconds);
- }
-
- @Override
- protected Clock getClock() {
- return CLOCK;
+ public MockTokenBucketFactory(
+ @JsonProperty("requestsPerSecond") long requestsPerSecond,
+ @JsonProperty("windowSeconds") long windowSeconds) {
+ super(requestsPerSecond, windowSeconds, CLOCK);
}
}
diff --git
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimiterFilterTest.java
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimiterFilterTest.java
index 4689bdee..a2dc8370 100644
---
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimiterFilterTest.java
+++
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimiterFilterTest.java
@@ -65,14 +65,15 @@ public class RateLimiterFilterTest {
"server.applicationConnectors[0].port",
"0"), // Bind to random port to support parallelism
ConfigOverride.config("server.adminConnectors[0].port", "0"),
- ConfigOverride.config("rateLimiter.type", "mock-realm-token-bucket"),
+ ConfigOverride.config("tokenBucketFactory.type", "mock"),
ConfigOverride.config(
- "rateLimiter.requestsPerSecond",
String.valueOf(REQUESTS_PER_SECOND)),
- ConfigOverride.config("rateLimiter.windowSeconds",
String.valueOf(WINDOW_SECONDS)));
+ "tokenBucketFactory.requestsPerSecond",
String.valueOf(REQUESTS_PER_SECOND)),
+ ConfigOverride.config(
+ "tokenBucketFactory.windowSeconds",
String.valueOf(WINDOW_SECONDS)));
private static String userToken;
private static String realm;
- private static MutableClock clock = MockRealmTokenBucketRateLimiter.CLOCK;
+ private static MutableClock clock = MockTokenBucketFactory.CLOCK;
@BeforeAll
public static void setup(
diff --git
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RealmTokenBucketRateLimiterTest.java
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RealmTokenBucketRateLimiterTest.java
index 9f0aad5c..efeee130 100644
---
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RealmTokenBucketRateLimiterTest.java
+++
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RealmTokenBucketRateLimiterTest.java
@@ -18,41 +18,49 @@
*/
package org.apache.polaris.service.dropwizard.ratelimiter;
+import static
org.apache.polaris.service.dropwizard.ratelimiter.MockTokenBucketFactory.CLOCK;
+
import java.time.Duration;
import org.apache.polaris.core.context.CallContext;
-import org.apache.polaris.service.ratelimiter.RateLimiter;
+import org.apache.polaris.service.ratelimiter.DefaultTokenBucketFactory;
+import org.apache.polaris.service.ratelimiter.RealmTokenBucketRateLimiter;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import org.threeten.extra.MutableClock;
/** Main unit test class for TokenBucketRateLimiter */
public class RealmTokenBucketRateLimiterTest {
@Test
void testDifferentBucketsDontTouch() {
- RateLimiter rateLimiter = new MockRealmTokenBucketRateLimiter(10, 10);
- RateLimitResultAsserter asserter = new
RateLimitResultAsserter(rateLimiter);
- MutableClock clock = MockRealmTokenBucketRateLimiter.CLOCK;
+ RealmTokenBucketRateLimiter rateLimiter = new
RealmTokenBucketRateLimiter();
+ rateLimiter.setTokenBucketFactory(new DefaultTokenBucketFactory(10, 10,
CLOCK));
for (int i = 0; i < 202; i++) {
String realm = (i % 2 == 0) ? "realm1" : "realm2";
CallContext.setCurrentContext(CallContext.of(() -> realm, null));
if (i < 200) {
- asserter.canAcquire(1);
+ Assertions.assertTrue(rateLimiter.canProceed());
} else {
- asserter.cantAcquire();
+ assertCannotProceed(rateLimiter);
}
}
- clock.add(Duration.ofSeconds(1));
+ CLOCK.add(Duration.ofSeconds(1));
for (int i = 0; i < 22; i++) {
String realm = (i % 2 == 0) ? "realm1" : "realm2";
CallContext.setCurrentContext(CallContext.of(() -> realm, null));
if (i < 20) {
- asserter.canAcquire(1);
+ Assertions.assertTrue(rateLimiter.canProceed());
} else {
- asserter.cantAcquire();
+ assertCannotProceed(rateLimiter);
}
}
}
+
+ private void assertCannotProceed(RealmTokenBucketRateLimiter rateLimiter) {
+ for (int i = 0; i < 5; i++) {
+ Assertions.assertFalse(rateLimiter.canProceed());
+ }
+ }
}
diff --git
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/TokenBucketRateLimiterTest.java
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/TokenBucketRateLimiterTest.java
index 454e9706..472b79ac 100644
---
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/TokenBucketRateLimiterTest.java
+++
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/TokenBucketRateLimiterTest.java
@@ -26,7 +26,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.polaris.service.ratelimiter.TokenBucketRateLimiter;
+import org.apache.polaris.service.ratelimiter.TokenBucket;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.threeten.extra.MutableClock;
@@ -38,19 +38,18 @@ public class TokenBucketRateLimiterTest {
MutableClock clock = MutableClock.of(Instant.now(), ZoneOffset.UTC);
clock.add(Duration.ofSeconds(5));
- RateLimitResultAsserter asserter =
- new RateLimitResultAsserter(new TokenBucketRateLimiter(10, 100,
clock));
+ TokenBucket tokenBucket = new TokenBucket(10, 100, clock);
- asserter.canAcquire(100);
- asserter.cantAcquire();
+ assertCanAcquire(tokenBucket, 100);
+ assertCannotAcquire(tokenBucket);
clock.add(Duration.ofSeconds(1));
- asserter.canAcquire(10);
- asserter.cantAcquire();
+ assertCanAcquire(tokenBucket, 10);
+ assertCannotAcquire(tokenBucket);
clock.add(Duration.ofSeconds(10));
- asserter.canAcquire(100);
- asserter.cantAcquire();
+ assertCanAcquire(tokenBucket, 100);
+ assertCannotAcquire(tokenBucket);
}
/**
@@ -63,9 +62,8 @@ public class TokenBucketRateLimiterTest {
int numTasks = 50000;
int tokensPerSecond = 10; // Can be anything above 0
- TokenBucketRateLimiter rl =
- new TokenBucketRateLimiter(
- tokensPerSecond, maxTokens, Clock.fixed(Instant.now(),
ZoneOffset.UTC));
+ TokenBucket rl =
+ new TokenBucket(tokensPerSecond, maxTokens, Clock.fixed(Instant.now(),
ZoneOffset.UTC));
AtomicInteger numAcquired = new AtomicInteger();
CountDownLatch startLatch = new CountDownLatch(numTasks);
CountDownLatch endLatch = new CountDownLatch(numTasks);
@@ -95,4 +93,16 @@ public class TokenBucketRateLimiterTest {
endLatch.await();
Assertions.assertEquals(maxTokens, numAcquired.get());
}
+
+ private void assertCanAcquire(TokenBucket tokenBucket, int times) {
+ for (int i = 0; i < times; i++) {
+ Assertions.assertTrue(tokenBucket.tryAcquire());
+ }
+ }
+
+ private void assertCannotAcquire(TokenBucket tokenBucket) {
+ for (int i = 0; i < 5; i++) {
+ Assertions.assertFalse(tokenBucket.tryAcquire());
+ }
+ }
}
diff --git a/dropwizard/service/src/test/resources/META-INF/hk2-locator/default
b/dropwizard/service/src/test/resources/META-INF/hk2-locator/default
index 07bd4da0..92b32e5a 100644
--- a/dropwizard/service/src/test/resources/META-INF/hk2-locator/default
+++ b/dropwizard/service/src/test/resources/META-INF/hk2-locator/default
@@ -21,7 +21,7 @@ contract={org.apache.polaris.service.catalog.io.FileIOFactory}
name=test
qualifier={io.smallrye.common.annotation.Identifier}
-[org.apache.polaris.service.dropwizard.ratelimiter.MockRealmTokenBucketRateLimiter]S
-contract={org.apache.polaris.service.ratelimiter.RateLimiter}
-name=mock-realm-token-bucket
+[org.apache.polaris.service.dropwizard.ratelimiter.MockTokenBucketFactory]S
+contract={org.apache.polaris.service.ratelimiter.TokenBucketFactory}
+name=mock
qualifier={io.smallrye.common.annotation.Identifier}
diff --git
a/dropwizard/service/src/test/resources/polaris-server-integrationtest.yml
b/dropwizard/service/src/test/resources/polaris-server-integrationtest.yml
index 76db1ec1..9d8f770e 100644
--- a/dropwizard/service/src/test/resources/polaris-server-integrationtest.yml
+++ b/dropwizard/service/src/test/resources/polaris-server-integrationtest.yml
@@ -153,8 +153,12 @@ logging:
# Limits the size of request bodies sent to Polaris. -1 means no limit.
maxRequestBodyBytes: 1000000
-# Limits the request rate per realm
+# Limits the request rate per realm.
rateLimiter:
type: realm-token-bucket
+
+# The token bucket factory to use when using the realm-token-bucket rate
limiter.
+tokenBucketFactory:
+ type: default
requestsPerSecond: 9999
windowSeconds: 10
diff --git a/polaris-server.yml b/polaris-server.yml
index e88307bf..0bc9f785 100644
--- a/polaris-server.yml
+++ b/polaris-server.yml
@@ -172,3 +172,11 @@ maxRequestBodyBytes: -1
# Optional, not specifying a "rateLimiter" section also means no rate limiter
rateLimiter:
type: no-op
+ # Uncomment to use the realm-token-bucket rate limiter
+ # type: realm-token-bucket
+
+# The token bucket factory to use when using the realm-token-bucket rate
limiter.
+tokenBucketFactory:
+ type: default
+ requestsPerSecond: 9999
+ windowSeconds: 10
diff --git
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/DefaultTokenBucketFactory.java
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/DefaultTokenBucketFactory.java
new file mode 100644
index 00000000..8393e8f0
--- /dev/null
+++
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/DefaultTokenBucketFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.smallrye.common.annotation.Identifier;
+import java.time.Clock;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.polaris.core.context.RealmContext;
+
+@Identifier("default")
+public class DefaultTokenBucketFactory implements TokenBucketFactory {
+
+ private final long requestsPerSecond;
+ private final long windowSeconds;
+ private final Clock clock;
+ private final Map<String, TokenBucket> perRealmBuckets = new
ConcurrentHashMap<>();
+
+ @JsonCreator
+ public DefaultTokenBucketFactory(
+ @JsonProperty("requestsPerSecond") long requestsPerSecond,
+ @JsonProperty("windowSeconds") long windowSeconds) {
+ this(requestsPerSecond, windowSeconds, Clock.systemUTC());
+ }
+
+ public DefaultTokenBucketFactory(long requestsPerSecond, long windowSeconds,
Clock clock) {
+ this.requestsPerSecond = requestsPerSecond;
+ this.windowSeconds = windowSeconds;
+ this.clock = clock;
+ }
+
+ @Override
+ public TokenBucket getOrCreateTokenBucket(RealmContext realmContext) {
+ String realmId = realmContext.getRealmIdentifier();
+ return perRealmBuckets.computeIfAbsent(
+ realmId,
+ k ->
+ new TokenBucket(
+ requestsPerSecond, Math.multiplyExact(requestsPerSecond,
windowSeconds), clock));
+ }
+}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java
index 7323c46b..b20c0169 100644
---
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java
+++
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java
@@ -24,7 +24,7 @@ import io.smallrye.common.annotation.Identifier;
@Identifier("no-op")
public class NoOpRateLimiter implements RateLimiter {
@Override
- public boolean tryAcquire() {
+ public boolean canProceed() {
return true;
}
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java
index be2017d3..da84fcf0 100644
---
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java
+++
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java
@@ -26,5 +26,5 @@ public interface RateLimiter {
*
* @return Whether the request is allowed to proceed by the rate limiter
*/
- boolean tryAcquire();
+ boolean canProceed();
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
index c5ad957b..d72def5f 100644
---
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
+++
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
@@ -42,7 +42,7 @@ public class RateLimiterFilter implements
ContainerRequestFilter {
/** Returns a 429 if the rate limiter says so. Otherwise, forwards the
request along. */
@Override
public void filter(ContainerRequestContext ctx) throws IOException {
- if (!rateLimiter.tryAcquire()) {
+ if (!rateLimiter.canProceed()) {
ctx.abortWith(Response.status(Response.Status.TOO_MANY_REQUESTS).build());
LOGGER.atDebug().log("Rate limiting request");
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java
index 85ea54e2..0d1deb88 100644
---
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java
+++
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java
@@ -18,36 +18,19 @@
*/
package org.apache.polaris.service.ratelimiter;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import io.smallrye.common.annotation.Identifier;
-import java.time.Clock;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
+import jakarta.inject.Inject;
import org.apache.polaris.core.context.CallContext;
-import org.apache.polaris.core.context.RealmContext;
/**
- * Rate limiter that maps the request's realm identifier to its own
TokenBucketRateLimiter, with its
- * own capacity.
+ * Rate limiter that maps the request's realm identifier to its own
TokenBucket, with its own
+ * capacity.
*/
@Identifier("realm-token-bucket")
public class RealmTokenBucketRateLimiter implements RateLimiter {
- private final long requestsPerSecond;
- private final long windowSeconds;
- private final Map<String, RateLimiter> perRealmLimiters;
- @VisibleForTesting
- @JsonCreator
- public RealmTokenBucketRateLimiter(
- @JsonProperty("requestsPerSecond") final long requestsPerSecond,
- @JsonProperty("windowSeconds") final long windowSeconds) {
- this.requestsPerSecond = requestsPerSecond;
- this.windowSeconds = windowSeconds;
- this.perRealmLimiters = new ConcurrentHashMap<>();
- }
+ @Inject protected TokenBucketFactory tokenBucketFactory;
/**
* This signifies that a request is being made. That is, the rate limiter
should count the request
@@ -56,26 +39,14 @@ public class RealmTokenBucketRateLimiter implements
RateLimiter {
* @return Whether the request is allowed to proceed by the rate limiter
*/
@Override
- public boolean tryAcquire() {
- String key =
- Optional.ofNullable(CallContext.getCurrentContext())
- .map(CallContext::getRealmContext)
- .map(RealmContext::getRealmIdentifier)
- .orElse("");
-
- return perRealmLimiters
- .computeIfAbsent(
- key,
- (k) ->
- new TokenBucketRateLimiter(
- requestsPerSecond,
- Math.multiplyExact(requestsPerSecond, windowSeconds),
- getClock()))
+ public boolean canProceed() {
+ return tokenBucketFactory
+
.getOrCreateTokenBucket(CallContext.getCurrentContext().getRealmContext())
.tryAcquire();
}
@VisibleForTesting
- protected Clock getClock() {
- return Clock.systemUTC();
+ public void setTokenBucketFactory(TokenBucketFactory tokenBucketFactory) {
+ this.tokenBucketFactory = tokenBucketFactory;
}
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiter.java
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucket.java
similarity index 82%
rename from
service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiter.java
rename to
service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucket.java
index 2b3adb61..d610a6a0 100644
---
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiter.java
+++
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucket.java
@@ -18,15 +18,13 @@
*/
package org.apache.polaris.service.ratelimiter;
-import io.smallrye.common.annotation.Identifier;
import java.time.InstantSource;
/**
- * Token bucket implementation of a Polaris RateLimiter. Acquires tokens at a
fixed rate and has a
- * maximum amount of tokens. Each successful "tryAcquire" costs 1 token.
+ * General-purpose Token bucket implementation. Acquires tokens at a fixed
rate and has a maximum
+ * amount of tokens. Each successful "tryAcquire" costs 1 token.
*/
-@Identifier("token-bucket")
-public class TokenBucketRateLimiter implements RateLimiter {
+public class TokenBucket {
private final double tokensPerMilli;
private final long maxTokens;
private final InstantSource instantSource;
@@ -34,7 +32,7 @@ public class TokenBucketRateLimiter implements RateLimiter {
private double tokens;
private long lastTokenGenerationMillis;
- public TokenBucketRateLimiter(long tokensPerSecond, long maxTokens,
InstantSource instantSource) {
+ public TokenBucket(long tokensPerSecond, long maxTokens, InstantSource
instantSource) {
this.tokensPerMilli = tokensPerSecond / 1000D;
this.maxTokens = maxTokens;
this.instantSource = instantSource;
@@ -48,7 +46,6 @@ public class TokenBucketRateLimiter implements RateLimiter {
*
* @return whether a token was successfully acquired and spent
*/
- @Override
public synchronized boolean tryAcquire() {
// Grant tokens for the time that has passed since our last tryAcquire()
long t = instantSource.millis();
diff --git
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimitResultAsserter.java
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketFactory.java
similarity index 53%
rename from
dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimitResultAsserter.java
rename to
service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketFactory.java
index e15ecbec..4ea382ba 100644
---
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimitResultAsserter.java
+++
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketFactory.java
@@ -16,28 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.service.dropwizard.ratelimiter;
+package org.apache.polaris.service.ratelimiter;
-import org.apache.polaris.service.ratelimiter.RateLimiter;
-import org.junit.jupiter.api.Assertions;
+import org.apache.polaris.core.context.RealmContext;
-/** Utility class for testing rate limiters. Lets you easily assert the result
of tryAcquire(). */
-public class RateLimitResultAsserter {
- private final RateLimiter rateLimiter;
+/** Factory for creating token buckets per realm. */
+public interface TokenBucketFactory {
- public RateLimitResultAsserter(RateLimiter rateLimiter) {
- this.rateLimiter = rateLimiter;
- }
-
- public void canAcquire(int times) {
- for (int i = 0; i < times; i++) {
- Assertions.assertTrue(rateLimiter.tryAcquire());
- }
- }
-
- public void cantAcquire() {
- for (int i = 0; i < 5; i++) {
- Assertions.assertFalse(rateLimiter.tryAcquire());
- }
- }
+ TokenBucket getOrCreateTokenBucket(RealmContext realmContext);
}