This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 81ec424 Rate limiter (#278)
81ec424 is described below
commit 81ec424de5bf4e466660b15dca1778026be07e9e
Author: Andrew Guterman <[email protected]>
AuthorDate: Tue Sep 24 09:06:53 2024 -0700
Rate limiter (#278)
---
polaris-server.yml | 4 +
.../apache/polaris/service/PolarisApplication.java | 9 ++
.../service/config/PolarisApplicationConfig.java | 13 +++
.../service/ratelimiter/NoOpRateLimiter.java | 30 +++++++
.../polaris/service/ratelimiter/RateLimiter.java | 34 ++++++++
.../service/ratelimiter/RateLimiterFilter.java | 56 +++++++++++++
.../ratelimiter/RealmTokenBucketRateLimiter.java | 81 ++++++++++++++++++
.../ratelimiter/TokenBucketRateLimiter.java | 64 ++++++++++++++
.../services/io.dropwizard.jackson.Discoverable | 1 +
...apache.polaris.service.ratelimiter.RateLimiter} | 9 +-
.../MockRealmTokenBucketRateLimiter.java | 45 ++++++++++
.../ratelimiter/RateLimitResultAsserter.java | 42 ++++++++++
.../service/ratelimiter/RateLimiterFilterTest.java | 87 +++++++++++++++++++
.../RealmTokenBucketRateLimiterTest.java | 57 +++++++++++++
.../polaris/service/ratelimiter/TestUtil.java | 56 +++++++++++++
.../ratelimiter/TokenBucketRateLimiterTest.java | 97 ++++++++++++++++++++++
...apache.polaris.service.ratelimiter.RateLimiter} | 8 +-
.../resources/polaris-server-integrationtest.yml | 6 ++
18 files changed, 685 insertions(+), 14 deletions(-)
diff --git a/polaris-server.yml b/polaris-server.yml
index 3c751d6..f3813ac 100644
--- a/polaris-server.yml
+++ b/polaris-server.yml
@@ -166,3 +166,7 @@ logging:
# Limits the size of request bodies sent to Polaris. -1 means no limit.
maxRequestBodyBytes: -1
+
+# Optional, not specifying a "rateLimiter" section also means no rate limiter
+rateLimiter:
+ type: no-op
diff --git
a/polaris-service/src/main/java/org/apache/polaris/service/PolarisApplication.java
b/polaris-service/src/main/java/org/apache/polaris/service/PolarisApplication.java
index 5706fb2..e62e871 100644
---
a/polaris-service/src/main/java/org/apache/polaris/service/PolarisApplication.java
+++
b/polaris-service/src/main/java/org/apache/polaris/service/PolarisApplication.java
@@ -101,6 +101,7 @@ import
org.apache.polaris.service.context.CallContextResolver;
import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
import org.apache.polaris.service.context.RealmContextResolver;
import
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
+import org.apache.polaris.service.ratelimiter.RateLimiterFilter;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.ManifestFileCleanupTaskHandler;
import org.apache.polaris.service.task.TableCleanupTaskHandler;
@@ -252,6 +253,14 @@ public class PolarisApplication extends
Application<PolarisApplicationConfig> {
.servlets()
.addFilter("tracing", new TracingFilter(openTelemetry))
.addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), true,
"/*");
+
+ if (configuration.getRateLimiter() != null) {
+ environment
+ .servlets()
+ .addFilter("ratelimiter", new
RateLimiterFilter(configuration.getRateLimiter()))
+ .addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), true,
"/*");
+ }
+
DiscoverableAuthenticator<String, AuthenticatedPolarisPrincipal>
authenticator =
configuration.getPolarisAuthenticator();
authenticator.setEntityManagerFactory(entityManagerFactory);
diff --git
a/polaris-service/src/main/java/org/apache/polaris/service/config/PolarisApplicationConfig.java
b/polaris-service/src/main/java/org/apache/polaris/service/config/PolarisApplicationConfig.java
index 56b83b9..f02222f 100644
---
a/polaris-service/src/main/java/org/apache/polaris/service/config/PolarisApplicationConfig.java
+++
b/polaris-service/src/main/java/org/apache/polaris/service/config/PolarisApplicationConfig.java
@@ -24,6 +24,7 @@ import io.dropwizard.core.Configuration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
@@ -32,6 +33,7 @@ import
org.apache.polaris.service.auth.DiscoverableAuthenticator;
import org.apache.polaris.service.catalog.FileIOFactory;
import org.apache.polaris.service.context.CallContextResolver;
import org.apache.polaris.service.context.RealmContextResolver;
+import org.apache.polaris.service.ratelimiter.RateLimiter;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -55,6 +57,7 @@ public class PolarisApplicationConfig extends Configuration {
private String awsAccessKey;
private String awsSecretKey;
private FileIOFactory fileIOFactory;
+ private RateLimiter rateLimiter;
public static final long REQUEST_BODY_BYTES_NO_LIMIT = -1;
private long maxRequestBodyBytes = REQUEST_BODY_BYTES_NO_LIMIT;
@@ -137,6 +140,16 @@ public class PolarisApplicationConfig extends
Configuration {
this.corsConfiguration = corsConfiguration;
}
+ @JsonProperty("rateLimiter")
+ public RateLimiter getRateLimiter() {
+ return rateLimiter;
+ }
+
+ @JsonProperty("rateLimiter")
+ public void setRateLimiter(@Nullable RateLimiter rateLimiter) {
+ this.rateLimiter = rateLimiter;
+ }
+
public void setTaskHandler(TaskHandlerConfiguration taskHandler) {
this.taskHandler = taskHandler;
}
diff --git
a/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java
new file mode 100644
index 0000000..33dffd7
--- /dev/null
+++
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/** Rate limiter that always allows the request */
+@JsonTypeName("no-op")
+public class NoOpRateLimiter implements RateLimiter {
+ @Override
+ public boolean tryAcquire() {
+ return true;
+ }
+}
diff --git
a/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java
new file mode 100644
index 0000000..5d102b6
--- /dev/null
+++
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.dropwizard.jackson.Discoverable;
+
+/** Interface for rate limiting requests */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
property = "type")
+public interface RateLimiter extends Discoverable {
+ /**
+ * This signifies that a request is being made. That is, the rate limiter
should count the request
+ * at this point.
+ *
+ * @return Whether the request is allowed to proceed by the rate limiter
+ */
+ boolean tryAcquire();
+}
diff --git
a/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
new file mode 100644
index 0000000..580da51
--- /dev/null
+++
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import jakarta.annotation.Priority;
+import jakarta.servlet.Filter;
+import jakarta.servlet.FilterChain;
+import jakarta.servlet.ServletException;
+import jakarta.servlet.ServletRequest;
+import jakarta.servlet.ServletResponse;
+import jakarta.servlet.http.HttpServletResponse;
+import jakarta.ws.rs.Priorities;
+import jakarta.ws.rs.core.Response;
+import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Request filter that returns a 429 Too Many Requests if the rate limiter
says so */
+@Priority(Priorities.AUTHORIZATION + 1)
+public class RateLimiterFilter implements Filter {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RateLimiterFilter.class);
+
+ private final RateLimiter rateLimiter;
+
+ public RateLimiterFilter(RateLimiter rateLimiter) {
+ this.rateLimiter = rateLimiter;
+ }
+
+ /** Returns a 429 if the rate limiter says so. Otherwise, forwards the
request along. */
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain)
+ throws IOException, ServletException {
+ if (response instanceof HttpServletResponse servletResponse &&
!rateLimiter.tryAcquire()) {
+
servletResponse.setStatus(Response.Status.TOO_MANY_REQUESTS.getStatusCode());
+ LOGGER.atDebug().log("Rate limiting request");
+ return;
+ }
+ chain.doFilter(request, response);
+ }
+}
diff --git
a/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java
new file mode 100644
index 0000000..bc45153
--- /dev/null
+++
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import java.time.Clock;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Rate limiter that maps the request's realm identifier to its own
TokenBucketRateLimiter, with its
+ * own capacity.
+ */
+@JsonTypeName("realm-token-bucket")
+public class RealmTokenBucketRateLimiter implements RateLimiter {
+ private final long requestsPerSecond;
+ private final long windowSeconds;
+ private final Map<String, RateLimiter> perRealmLimiters;
+
+ @VisibleForTesting
+ @JsonCreator
+ public RealmTokenBucketRateLimiter(
+ @JsonProperty("requestsPerSecond") final long requestsPerSecond,
+ @JsonProperty("windowSeconds") final long windowSeconds) {
+ this.requestsPerSecond = requestsPerSecond;
+ this.windowSeconds = windowSeconds;
+ this.perRealmLimiters = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * This signifies that a request is being made. That is, the rate limiter
should count the request
+ * at this point.
+ *
+ * @return Whether the request is allowed to proceed by the rate limiter
+ */
+ @Override
+ public boolean tryAcquire() {
+ String key =
+ Optional.ofNullable(CallContext.getCurrentContext())
+ .map(CallContext::getRealmContext)
+ .map(RealmContext::getRealmIdentifier)
+ .orElse("");
+
+ return perRealmLimiters
+ .computeIfAbsent(
+ key,
+ (k) ->
+ new TokenBucketRateLimiter(
+ requestsPerSecond,
+ Math.multiplyExact(requestsPerSecond, windowSeconds),
+ getClock()))
+ .tryAcquire();
+ }
+
+ @VisibleForTesting
+ protected Clock getClock() {
+ return Clock.systemUTC();
+ }
+}
diff --git
a/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiter.java
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiter.java
new file mode 100644
index 0000000..57ef0e2
--- /dev/null
+++
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import java.time.InstantSource;
+
+/**
+ * Token bucket implementation of a Polaris RateLimiter. Acquires tokens at a
fixed rate and has a
+ * maximum amount of tokens. Each successful "tryAcquire" costs 1 token.
+ */
+public class TokenBucketRateLimiter implements RateLimiter {
+ private final double tokensPerMilli;
+ private final long maxTokens;
+ private final InstantSource instantSource;
+
+ private double tokens;
+ private long lastTokenGenerationMillis;
+
+ public TokenBucketRateLimiter(long tokensPerSecond, long maxTokens,
InstantSource instantSource) {
+ this.tokensPerMilli = tokensPerSecond / 1000D;
+ this.maxTokens = maxTokens;
+ this.instantSource = instantSource;
+
+ tokens = maxTokens;
+ lastTokenGenerationMillis = instantSource.millis();
+ }
+
+ /**
+ * Tries to acquire and spend 1 token. Doesn't block if a token isn't
available.
+ *
+ * @return whether a token was successfully acquired & spent
+ */
+ @Override
+ public synchronized boolean tryAcquire() {
+ // Grant tokens for the time that has passed since our last tryAcquire()
+ long t = instantSource.millis();
+ long millisPassed = Math.subtractExact(t, lastTokenGenerationMillis);
+ lastTokenGenerationMillis = t;
+ tokens = Math.min(maxTokens, tokens + (millisPassed * tokensPerMilli));
+
+ // Take a token if they have one available
+ if (tokens >= 1) {
+ tokens--;
+ return true;
+ }
+ return false;
+ }
+}
diff --git
a/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
b/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
index e290634..d0deb18 100644
---
a/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
+++
b/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
@@ -24,3 +24,4 @@ org.apache.polaris.service.context.RealmContextResolver
org.apache.polaris.service.context.CallContextResolver
org.apache.polaris.service.auth.TokenBrokerFactory
org.apache.polaris.service.catalog.FileIOFactory
+org.apache.polaris.service.ratelimiter.RateLimiter
diff --git
a/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
b/polaris-service/src/main/resources/META-INF/services/org.apache.polaris.service.ratelimiter.RateLimiter
similarity index 67%
copy from
polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
copy to
polaris-service/src/main/resources/META-INF/services/org.apache.polaris.service.ratelimiter.RateLimiter
index e290634..461bcc2 100644
---
a/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
+++
b/polaris-service/src/main/resources/META-INF/services/org.apache.polaris.service.ratelimiter.RateLimiter
@@ -17,10 +17,5 @@
# under the License.
#
-org.apache.polaris.service.auth.DiscoverableAuthenticator
-org.apache.polaris.core.persistence.MetaStoreManagerFactory
-org.apache.polaris.service.config.OAuth2ApiService
-org.apache.polaris.service.context.RealmContextResolver
-org.apache.polaris.service.context.CallContextResolver
-org.apache.polaris.service.auth.TokenBrokerFactory
-org.apache.polaris.service.catalog.FileIOFactory
+org.apache.polaris.service.ratelimiter.RealmTokenBucketRateLimiter
+org.apache.polaris.service.ratelimiter.NoOpRateLimiter
diff --git
a/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/MockRealmTokenBucketRateLimiter.java
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/MockRealmTokenBucketRateLimiter.java
new file mode 100644
index 0000000..9557009
--- /dev/null
+++
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/MockRealmTokenBucketRateLimiter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import org.threeten.extra.MutableClock;
+
+/** RealmTokenBucketRateLimiter with a mock clock */
+@JsonTypeName("mock-realm-token-bucket")
+public class MockRealmTokenBucketRateLimiter extends
RealmTokenBucketRateLimiter {
+ public static MutableClock CLOCK = MutableClock.of(Instant.now(),
ZoneOffset.UTC);
+
+ @JsonCreator
+ public MockRealmTokenBucketRateLimiter(
+ @JsonProperty("requestsPerSecond") final long requestsPerSecond,
+ @JsonProperty("windowSeconds") final long windowSeconds) {
+ super(requestsPerSecond, windowSeconds);
+ }
+
+ @Override
+ protected Clock getClock() {
+ return CLOCK;
+ }
+}
diff --git
a/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimitResultAsserter.java
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimitResultAsserter.java
new file mode 100644
index 0000000..a133257
--- /dev/null
+++
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimitResultAsserter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import org.junit.jupiter.api.Assertions;
+
+/** Utility class for testing rate limiters. Lets you easily assert the result
of tryAcquire(). */
+public class RateLimitResultAsserter {
+ private final RateLimiter rateLimiter;
+
+ public RateLimitResultAsserter(RateLimiter rateLimiter) {
+ this.rateLimiter = rateLimiter;
+ }
+
+ public void canAcquire(int times) {
+ for (int i = 0; i < times; i++) {
+ Assertions.assertTrue(rateLimiter.tryAcquire());
+ }
+ }
+
+ public void cantAcquire() {
+ for (int i = 0; i < 5; i++) {
+ Assertions.assertFalse(rateLimiter.tryAcquire());
+ }
+ }
+}
diff --git
a/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java
new file mode 100644
index 0000000..a188b05
--- /dev/null
+++
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import io.dropwizard.testing.ConfigOverride;
+import io.dropwizard.testing.ResourceHelpers;
+import io.dropwizard.testing.junit5.DropwizardAppExtension;
+import io.dropwizard.testing.junit5.DropwizardExtensionsSupport;
+import jakarta.ws.rs.core.Response;
+import java.time.Duration;
+import java.util.function.Consumer;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.service.PolarisApplication;
+import org.apache.polaris.service.config.PolarisApplicationConfig;
+import org.apache.polaris.service.test.PolarisConnectionExtension;
+import org.apache.polaris.service.test.PolarisRealm;
+import org.apache.polaris.service.test.SnowmanCredentialsExtension;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.threeten.extra.MutableClock;
+
+/** Main integration tests for rate limiting */
+@ExtendWith({
+ DropwizardExtensionsSupport.class,
+ PolarisConnectionExtension.class,
+ SnowmanCredentialsExtension.class
+})
+public class RateLimiterFilterTest {
+ private static final long REQUESTS_PER_SECOND = 5;
+ private static final long WINDOW_SECONDS = 10;
+ private static final DropwizardAppExtension<PolarisApplicationConfig> EXT =
+ new DropwizardAppExtension<>(
+ PolarisApplication.class,
+
ResourceHelpers.resourceFilePath("polaris-server-integrationtest.yml"),
+ ConfigOverride.config(
+ "server.applicationConnectors[0].port",
+ "0"), // Bind to random port to support parallelism
+ ConfigOverride.config("server.adminConnectors[0].port", "0"),
+ ConfigOverride.config("rateLimiter.type", "mock-realm-token-bucket"),
+ ConfigOverride.config(
+ "rateLimiter.requestsPerSecond",
String.valueOf(REQUESTS_PER_SECOND)),
+ ConfigOverride.config("rateLimiter.windowSeconds",
String.valueOf(WINDOW_SECONDS)));
+
+ private static String userToken;
+ private static String realm;
+
+ @BeforeAll
+ public static void setup(
+ PolarisConnectionExtension.PolarisToken userToken, @PolarisRealm String
polarisRealm) {
+ realm = polarisRealm;
+ RateLimiterFilterTest.userToken = userToken.token();
+ }
+
+ @Test
+ public void testRateLimiter() {
+ Consumer<Response.Status> requestAsserter =
+ TestUtil.constructRequestAsserter(EXT, userToken, realm);
+ CallContext.setCurrentContext(CallContext.of(() -> "myrealm", null));
+
+ MutableClock clock = MockRealmTokenBucketRateLimiter.CLOCK;
+ clock.add(Duration.ofSeconds(2 * WINDOW_SECONDS)); // Clear any counters
from before this test
+
+ for (int i = 0; i < REQUESTS_PER_SECOND * WINDOW_SECONDS; i++) {
+ requestAsserter.accept(Response.Status.OK);
+ }
+ requestAsserter.accept(Response.Status.TOO_MANY_REQUESTS);
+
+ clock.add(Duration.ofSeconds(4 * WINDOW_SECONDS)); // Clear any counters
from during this test
+ }
+}
diff --git
a/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiterTest.java
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiterTest.java
new file mode 100644
index 0000000..10cd71c
--- /dev/null
+++
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiterTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import java.time.Duration;
+import org.apache.polaris.core.context.CallContext;
+import org.junit.jupiter.api.Test;
+import org.threeten.extra.MutableClock;
+
+/** Main unit test class for TokenBucketRateLimiter */
+public class RealmTokenBucketRateLimiterTest {
+ @Test
+ void testDifferentBucketsDontTouch() {
+ RateLimiter rateLimiter = new MockRealmTokenBucketRateLimiter(10, 10);
+ RateLimitResultAsserter asserter = new
RateLimitResultAsserter(rateLimiter);
+ MutableClock clock = MockRealmTokenBucketRateLimiter.CLOCK;
+
+ for (int i = 0; i < 202; i++) {
+ String realm = (i % 2 == 0) ? "realm1" : "realm2";
+ CallContext.setCurrentContext(CallContext.of(() -> realm, null));
+
+ if (i < 200) {
+ asserter.canAcquire(1);
+ } else {
+ asserter.cantAcquire();
+ }
+ }
+
+ clock.add(Duration.ofSeconds(1));
+ for (int i = 0; i < 22; i++) {
+ String realm = (i % 2 == 0) ? "realm1" : "realm2";
+ CallContext.setCurrentContext(CallContext.of(() -> realm, null));
+
+ if (i < 20) {
+ asserter.canAcquire(1);
+ } else {
+ asserter.cantAcquire();
+ }
+ }
+ }
+}
diff --git
a/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/TestUtil.java
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/TestUtil.java
new file mode 100644
index 0000000..43404be
--- /dev/null
+++
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/TestUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import static
org.apache.polaris.service.context.DefaultContextResolver.REALM_PROPERTY_KEY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.dropwizard.testing.junit5.DropwizardAppExtension;
+import jakarta.ws.rs.core.Response;
+import java.util.function.Consumer;
+import org.apache.polaris.service.config.PolarisApplicationConfig;
+
+/** Common test utils for testing rate limiting */
+public class TestUtil {
+ /**
+ * Constructs a function that makes a request to list all principal roles
and asserts the status
+ * of the response. This is a relatively simple type of request that can be
used for validating
+ * whether the rate limiter intervenes.
+ */
+ public static Consumer<Response.Status> constructRequestAsserter(
+ DropwizardAppExtension<PolarisApplicationConfig> dropwizardAppExtension,
+ String userToken,
+ String realm) {
+ return (Response.Status status) -> {
+ try (Response response =
+ dropwizardAppExtension
+ .client()
+ .target(
+ String.format(
+ "http://localhost:%d/api/management/v1/principal-roles",
+ dropwizardAppExtension.getLocalPort()))
+ .request("application/json")
+ .header("Authorization", "Bearer " + userToken)
+ .header(REALM_PROPERTY_KEY, realm)
+ .get()) {
+ assertThat(response).returns(status.getStatusCode(),
Response::getStatus);
+ }
+ };
+ }
+}
diff --git
a/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiterTest.java
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiterTest.java
new file mode 100644
index 0000000..57a62e2
--- /dev/null
+++
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiterTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.threeten.extra.MutableClock;
+
+/** Main unit test class for TokenBucketRateLimiter */
+public class TokenBucketRateLimiterTest {
+ @Test
+ void testBasic() {
+ MutableClock clock = MutableClock.of(Instant.now(), ZoneOffset.UTC);
+ clock.add(Duration.ofSeconds(5));
+
+ RateLimitResultAsserter asserter =
+ new RateLimitResultAsserter(new TokenBucketRateLimiter(10, 100,
clock));
+
+ asserter.canAcquire(100);
+ asserter.cantAcquire();
+
+ clock.add(Duration.ofSeconds(1));
+ asserter.canAcquire(10);
+ asserter.cantAcquire();
+
+ clock.add(Duration.ofSeconds(10));
+ asserter.canAcquire(100);
+ asserter.cantAcquire();
+ }
+
+ /**
+ * Starts several threads that try to query the rate limiter at the same
time, ensuring that we
+ * only allow "maxTokens" requests
+ */
+ @Test
+ void testConcurrent() throws InterruptedException {
+ int maxTokens = 100;
+ int numTasks = 50000;
+ int tokensPerSecond = 10; // Can be anything above 0
+
+ TokenBucketRateLimiter rl =
+ new TokenBucketRateLimiter(
+ tokensPerSecond, maxTokens, Clock.fixed(Instant.now(),
ZoneOffset.UTC));
+ AtomicInteger numAcquired = new AtomicInteger();
+ CountDownLatch startLatch = new CountDownLatch(numTasks);
+ CountDownLatch endLatch = new CountDownLatch(numTasks);
+
+ try (ExecutorService executor =
Executors.newVirtualThreadPerTaskExecutor()) {
+ for (int i = 0; i < numTasks; i++) {
+ int i_ = i;
+ executor.submit(
+ () -> {
+ try {
+ // Enforce that tasks pause until all tasks are submitted
+ startLatch.countDown();
+ startLatch.await();
+
+ if (rl.tryAcquire()) {
+ numAcquired.incrementAndGet();
+ }
+
+ endLatch.countDown();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ endLatch.await();
+ Assertions.assertEquals(maxTokens, numAcquired.get());
+ }
+}
diff --git
a/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
b/polaris-service/src/test/resources/META-INF/services/org.apache.polaris.service.ratelimiter.RateLimiter
similarity index 67%
copy from
polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
copy to
polaris-service/src/test/resources/META-INF/services/org.apache.polaris.service.ratelimiter.RateLimiter
index e290634..c43c88a 100644
---
a/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
+++
b/polaris-service/src/test/resources/META-INF/services/org.apache.polaris.service.ratelimiter.RateLimiter
@@ -17,10 +17,4 @@
# under the License.
#
-org.apache.polaris.service.auth.DiscoverableAuthenticator
-org.apache.polaris.core.persistence.MetaStoreManagerFactory
-org.apache.polaris.service.config.OAuth2ApiService
-org.apache.polaris.service.context.RealmContextResolver
-org.apache.polaris.service.context.CallContextResolver
-org.apache.polaris.service.auth.TokenBrokerFactory
-org.apache.polaris.service.catalog.FileIOFactory
+org.apache.polaris.service.ratelimiter.MockRealmTokenBucketRateLimiter
diff --git
a/polaris-service/src/test/resources/polaris-server-integrationtest.yml
b/polaris-service/src/test/resources/polaris-server-integrationtest.yml
index d6a1aac..4b430f3 100644
--- a/polaris-service/src/test/resources/polaris-server-integrationtest.yml
+++ b/polaris-service/src/test/resources/polaris-server-integrationtest.yml
@@ -152,3 +152,9 @@ logging:
# Limits the size of request bodies sent to Polaris. -1 means no limit.
maxRequestBodyBytes: 1000000
+
+# Limits the request rate per realm
+rateLimiter:
+ type: realm-token-bucket
+ requestsPerSecond: 10
+ windowSeconds: 10