This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 81ec424  Rate limiter (#278)
81ec424 is described below

commit 81ec424de5bf4e466660b15dca1778026be07e9e
Author: Andrew Guterman <[email protected]>
AuthorDate: Tue Sep 24 09:06:53 2024 -0700

    Rate limiter (#278)
---
 polaris-server.yml                                 |  4 +
 .../apache/polaris/service/PolarisApplication.java |  9 ++
 .../service/config/PolarisApplicationConfig.java   | 13 +++
 .../service/ratelimiter/NoOpRateLimiter.java       | 30 +++++++
 .../polaris/service/ratelimiter/RateLimiter.java   | 34 ++++++++
 .../service/ratelimiter/RateLimiterFilter.java     | 56 +++++++++++++
 .../ratelimiter/RealmTokenBucketRateLimiter.java   | 81 ++++++++++++++++++
 .../ratelimiter/TokenBucketRateLimiter.java        | 64 ++++++++++++++
 .../services/io.dropwizard.jackson.Discoverable    |  1 +
 ...apache.polaris.service.ratelimiter.RateLimiter} |  9 +-
 .../MockRealmTokenBucketRateLimiter.java           | 45 ++++++++++
 .../ratelimiter/RateLimitResultAsserter.java       | 42 ++++++++++
 .../service/ratelimiter/RateLimiterFilterTest.java | 87 +++++++++++++++++++
 .../RealmTokenBucketRateLimiterTest.java           | 57 +++++++++++++
 .../polaris/service/ratelimiter/TestUtil.java      | 56 +++++++++++++
 .../ratelimiter/TokenBucketRateLimiterTest.java    | 97 ++++++++++++++++++++++
 ...apache.polaris.service.ratelimiter.RateLimiter} |  8 +-
 .../resources/polaris-server-integrationtest.yml   |  6 ++
 18 files changed, 685 insertions(+), 14 deletions(-)

diff --git a/polaris-server.yml b/polaris-server.yml
index 3c751d6..f3813ac 100644
--- a/polaris-server.yml
+++ b/polaris-server.yml
@@ -166,3 +166,7 @@ logging:
 
 # Limits the size of request bodies sent to Polaris. -1 means no limit.
 maxRequestBodyBytes: -1
+
+# Optional, not specifying a "rateLimiter" section also means no rate limiter
+rateLimiter:
+  type: no-op
diff --git 
a/polaris-service/src/main/java/org/apache/polaris/service/PolarisApplication.java
 
b/polaris-service/src/main/java/org/apache/polaris/service/PolarisApplication.java
index 5706fb2..e62e871 100644
--- 
a/polaris-service/src/main/java/org/apache/polaris/service/PolarisApplication.java
+++ 
b/polaris-service/src/main/java/org/apache/polaris/service/PolarisApplication.java
@@ -101,6 +101,7 @@ import 
org.apache.polaris.service.context.CallContextResolver;
 import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
 import org.apache.polaris.service.context.RealmContextResolver;
 import 
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
+import org.apache.polaris.service.ratelimiter.RateLimiterFilter;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
 import org.apache.polaris.service.task.ManifestFileCleanupTaskHandler;
 import org.apache.polaris.service.task.TableCleanupTaskHandler;
@@ -252,6 +253,14 @@ public class PolarisApplication extends 
Application<PolarisApplicationConfig> {
         .servlets()
         .addFilter("tracing", new TracingFilter(openTelemetry))
         .addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), true, 
"/*");
+
+    if (configuration.getRateLimiter() != null) {
+      environment
+          .servlets()
+          .addFilter("ratelimiter", new 
RateLimiterFilter(configuration.getRateLimiter()))
+          .addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), true, 
"/*");
+    }
+
     DiscoverableAuthenticator<String, AuthenticatedPolarisPrincipal> 
authenticator =
         configuration.getPolarisAuthenticator();
     authenticator.setEntityManagerFactory(entityManagerFactory);
diff --git 
a/polaris-service/src/main/java/org/apache/polaris/service/config/PolarisApplicationConfig.java
 
b/polaris-service/src/main/java/org/apache/polaris/service/config/PolarisApplicationConfig.java
index 56b83b9..f02222f 100644
--- 
a/polaris-service/src/main/java/org/apache/polaris/service/config/PolarisApplicationConfig.java
+++ 
b/polaris-service/src/main/java/org/apache/polaris/service/config/PolarisApplicationConfig.java
@@ -24,6 +24,7 @@ import io.dropwizard.core.Configuration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.polaris.core.PolarisConfigurationStore;
 import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
@@ -32,6 +33,7 @@ import 
org.apache.polaris.service.auth.DiscoverableAuthenticator;
 import org.apache.polaris.service.catalog.FileIOFactory;
 import org.apache.polaris.service.context.CallContextResolver;
 import org.apache.polaris.service.context.RealmContextResolver;
+import org.apache.polaris.service.ratelimiter.RateLimiter;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -55,6 +57,7 @@ public class PolarisApplicationConfig extends Configuration {
   private String awsAccessKey;
   private String awsSecretKey;
   private FileIOFactory fileIOFactory;
+  private RateLimiter rateLimiter;
 
   public static final long REQUEST_BODY_BYTES_NO_LIMIT = -1;
   private long maxRequestBodyBytes = REQUEST_BODY_BYTES_NO_LIMIT;
@@ -137,6 +140,16 @@ public class PolarisApplicationConfig extends 
Configuration {
     this.corsConfiguration = corsConfiguration;
   }
 
+  @JsonProperty("rateLimiter")
+  public RateLimiter getRateLimiter() {
+    return rateLimiter;
+  }
+
+  @JsonProperty("rateLimiter")
+  public void setRateLimiter(@Nullable RateLimiter rateLimiter) {
+    this.rateLimiter = rateLimiter;
+  }
+
   public void setTaskHandler(TaskHandlerConfiguration taskHandler) {
     this.taskHandler = taskHandler;
   }
diff --git 
a/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java
 
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java
new file mode 100644
index 0000000..33dffd7
--- /dev/null
+++ 
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/** Rate limiter that always allows the request */
+@JsonTypeName("no-op")
+public class NoOpRateLimiter implements RateLimiter {
+  @Override
+  public boolean tryAcquire() {
+    return true;
+  }
+}
diff --git 
a/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java
 
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java
new file mode 100644
index 0000000..5d102b6
--- /dev/null
+++ 
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.dropwizard.jackson.Discoverable;
+
+/** Interface for rate limiting requests */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
+public interface RateLimiter extends Discoverable {
+  /**
+   * This signifies that a request is being made. That is, the rate limiter 
should count the request
+   * at this point.
+   *
+   * @return Whether the request is allowed to proceed by the rate limiter
+   */
+  boolean tryAcquire();
+}
diff --git 
a/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
 
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
new file mode 100644
index 0000000..580da51
--- /dev/null
+++ 
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import jakarta.annotation.Priority;
+import jakarta.servlet.Filter;
+import jakarta.servlet.FilterChain;
+import jakarta.servlet.ServletException;
+import jakarta.servlet.ServletRequest;
+import jakarta.servlet.ServletResponse;
+import jakarta.servlet.http.HttpServletResponse;
+import jakarta.ws.rs.Priorities;
+import jakarta.ws.rs.core.Response;
+import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Request filter that returns a 429 Too Many Requests if the rate limiter 
says so */
+@Priority(Priorities.AUTHORIZATION + 1)
+public class RateLimiterFilter implements Filter {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RateLimiterFilter.class);
+
+  private final RateLimiter rateLimiter;
+
+  public RateLimiterFilter(RateLimiter rateLimiter) {
+    this.rateLimiter = rateLimiter;
+  }
+
+  /** Returns a 429 if the rate limiter says so. Otherwise, forwards the 
request along. */
+  @Override
+  public void doFilter(ServletRequest request, ServletResponse response, 
FilterChain chain)
+      throws IOException, ServletException {
+    if (response instanceof HttpServletResponse servletResponse && 
!rateLimiter.tryAcquire()) {
+      
servletResponse.setStatus(Response.Status.TOO_MANY_REQUESTS.getStatusCode());
+      LOGGER.atDebug().log("Rate limiting request");
+      return;
+    }
+    chain.doFilter(request, response);
+  }
+}
diff --git 
a/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java
 
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java
new file mode 100644
index 0000000..bc45153
--- /dev/null
+++ 
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import java.time.Clock;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Rate limiter that maps the request's realm identifier to its own 
TokenBucketRateLimiter, with its
+ * own capacity.
+ */
+@JsonTypeName("realm-token-bucket")
+public class RealmTokenBucketRateLimiter implements RateLimiter {
+  private final long requestsPerSecond;
+  private final long windowSeconds;
+  private final Map<String, RateLimiter> perRealmLimiters;
+
+  @VisibleForTesting
+  @JsonCreator
+  public RealmTokenBucketRateLimiter(
+      @JsonProperty("requestsPerSecond") final long requestsPerSecond,
+      @JsonProperty("windowSeconds") final long windowSeconds) {
+    this.requestsPerSecond = requestsPerSecond;
+    this.windowSeconds = windowSeconds;
+    this.perRealmLimiters = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * This signifies that a request is being made. That is, the rate limiter 
should count the request
+   * at this point.
+   *
+   * @return Whether the request is allowed to proceed by the rate limiter
+   */
+  @Override
+  public boolean tryAcquire() {
+    String key =
+        Optional.ofNullable(CallContext.getCurrentContext())
+            .map(CallContext::getRealmContext)
+            .map(RealmContext::getRealmIdentifier)
+            .orElse("");
+
+    return perRealmLimiters
+        .computeIfAbsent(
+            key,
+            (k) ->
+                new TokenBucketRateLimiter(
+                    requestsPerSecond,
+                    Math.multiplyExact(requestsPerSecond, windowSeconds),
+                    getClock()))
+        .tryAcquire();
+  }
+
+  @VisibleForTesting
+  protected Clock getClock() {
+    return Clock.systemUTC();
+  }
+}
diff --git 
a/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiter.java
 
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiter.java
new file mode 100644
index 0000000..57ef0e2
--- /dev/null
+++ 
b/polaris-service/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import java.time.InstantSource;
+
+/**
+ * Token bucket implementation of a Polaris RateLimiter. Acquires tokens at a 
fixed rate and has a
+ * maximum amount of tokens. Each successful "tryAcquire" costs 1 token.
+ */
+public class TokenBucketRateLimiter implements RateLimiter {
+  private final double tokensPerMilli;
+  private final long maxTokens;
+  private final InstantSource instantSource;
+
+  private double tokens;
+  private long lastTokenGenerationMillis;
+
+  public TokenBucketRateLimiter(long tokensPerSecond, long maxTokens, 
InstantSource instantSource) {
+    this.tokensPerMilli = tokensPerSecond / 1000D;
+    this.maxTokens = maxTokens;
+    this.instantSource = instantSource;
+
+    tokens = maxTokens;
+    lastTokenGenerationMillis = instantSource.millis();
+  }
+
+  /**
+   * Tries to acquire and spend 1 token. Doesn't block if a token isn't 
available.
+   *
+   * @return whether a token was successfully acquired & spent
+   */
+  @Override
+  public synchronized boolean tryAcquire() {
+    // Grant tokens for the time that has passed since our last tryAcquire()
+    long t = instantSource.millis();
+    long millisPassed = Math.subtractExact(t, lastTokenGenerationMillis);
+    lastTokenGenerationMillis = t;
+    tokens = Math.min(maxTokens, tokens + (millisPassed * tokensPerMilli));
+
+    // Take a token if they have one available
+    if (tokens >= 1) {
+      tokens--;
+      return true;
+    }
+    return false;
+  }
+}
diff --git 
a/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
 
b/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
index e290634..d0deb18 100644
--- 
a/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
+++ 
b/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
@@ -24,3 +24,4 @@ org.apache.polaris.service.context.RealmContextResolver
 org.apache.polaris.service.context.CallContextResolver
 org.apache.polaris.service.auth.TokenBrokerFactory
 org.apache.polaris.service.catalog.FileIOFactory
+org.apache.polaris.service.ratelimiter.RateLimiter
diff --git 
a/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
 
b/polaris-service/src/main/resources/META-INF/services/org.apache.polaris.service.ratelimiter.RateLimiter
similarity index 67%
copy from 
polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
copy to 
polaris-service/src/main/resources/META-INF/services/org.apache.polaris.service.ratelimiter.RateLimiter
index e290634..461bcc2 100644
--- 
a/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
+++ 
b/polaris-service/src/main/resources/META-INF/services/org.apache.polaris.service.ratelimiter.RateLimiter
@@ -17,10 +17,5 @@
 # under the License.
 #
 
-org.apache.polaris.service.auth.DiscoverableAuthenticator
-org.apache.polaris.core.persistence.MetaStoreManagerFactory
-org.apache.polaris.service.config.OAuth2ApiService
-org.apache.polaris.service.context.RealmContextResolver
-org.apache.polaris.service.context.CallContextResolver
-org.apache.polaris.service.auth.TokenBrokerFactory
-org.apache.polaris.service.catalog.FileIOFactory
+org.apache.polaris.service.ratelimiter.RealmTokenBucketRateLimiter
+org.apache.polaris.service.ratelimiter.NoOpRateLimiter
diff --git 
a/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/MockRealmTokenBucketRateLimiter.java
 
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/MockRealmTokenBucketRateLimiter.java
new file mode 100644
index 0000000..9557009
--- /dev/null
+++ 
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/MockRealmTokenBucketRateLimiter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import org.threeten.extra.MutableClock;
+
+/** RealmTokenBucketRateLimiter with a mock clock */
+@JsonTypeName("mock-realm-token-bucket")
+public class MockRealmTokenBucketRateLimiter extends 
RealmTokenBucketRateLimiter {
+  public static MutableClock CLOCK = MutableClock.of(Instant.now(), 
ZoneOffset.UTC);
+
+  @JsonCreator
+  public MockRealmTokenBucketRateLimiter(
+      @JsonProperty("requestsPerSecond") final long requestsPerSecond,
+      @JsonProperty("windowSeconds") final long windowSeconds) {
+    super(requestsPerSecond, windowSeconds);
+  }
+
+  @Override
+  protected Clock getClock() {
+    return CLOCK;
+  }
+}
diff --git 
a/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimitResultAsserter.java
 
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimitResultAsserter.java
new file mode 100644
index 0000000..a133257
--- /dev/null
+++ 
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimitResultAsserter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import org.junit.jupiter.api.Assertions;
+
+/** Utility class for testing rate limiters. Lets you easily assert the result 
of tryAcquire(). */
+public class RateLimitResultAsserter {
+  private final RateLimiter rateLimiter;
+
+  public RateLimitResultAsserter(RateLimiter rateLimiter) {
+    this.rateLimiter = rateLimiter;
+  }
+
+  public void canAcquire(int times) {
+    for (int i = 0; i < times; i++) {
+      Assertions.assertTrue(rateLimiter.tryAcquire());
+    }
+  }
+
+  public void cantAcquire() {
+    for (int i = 0; i < 5; i++) {
+      Assertions.assertFalse(rateLimiter.tryAcquire());
+    }
+  }
+}
diff --git 
a/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java
 
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java
new file mode 100644
index 0000000..a188b05
--- /dev/null
+++ 
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import io.dropwizard.testing.ConfigOverride;
+import io.dropwizard.testing.ResourceHelpers;
+import io.dropwizard.testing.junit5.DropwizardAppExtension;
+import io.dropwizard.testing.junit5.DropwizardExtensionsSupport;
+import jakarta.ws.rs.core.Response;
+import java.time.Duration;
+import java.util.function.Consumer;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.service.PolarisApplication;
+import org.apache.polaris.service.config.PolarisApplicationConfig;
+import org.apache.polaris.service.test.PolarisConnectionExtension;
+import org.apache.polaris.service.test.PolarisRealm;
+import org.apache.polaris.service.test.SnowmanCredentialsExtension;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.threeten.extra.MutableClock;
+
+/** Main integration tests for rate limiting */
+@ExtendWith({
+  DropwizardExtensionsSupport.class,
+  PolarisConnectionExtension.class,
+  SnowmanCredentialsExtension.class
+})
+public class RateLimiterFilterTest {
+  private static final long REQUESTS_PER_SECOND = 5;
+  private static final long WINDOW_SECONDS = 10;
+  private static final DropwizardAppExtension<PolarisApplicationConfig> EXT =
+      new DropwizardAppExtension<>(
+          PolarisApplication.class,
+          
ResourceHelpers.resourceFilePath("polaris-server-integrationtest.yml"),
+          ConfigOverride.config(
+              "server.applicationConnectors[0].port",
+              "0"), // Bind to random port to support parallelism
+          ConfigOverride.config("server.adminConnectors[0].port", "0"),
+          ConfigOverride.config("rateLimiter.type", "mock-realm-token-bucket"),
+          ConfigOverride.config(
+              "rateLimiter.requestsPerSecond", 
String.valueOf(REQUESTS_PER_SECOND)),
+          ConfigOverride.config("rateLimiter.windowSeconds", 
String.valueOf(WINDOW_SECONDS)));
+
+  private static String userToken;
+  private static String realm;
+
+  @BeforeAll
+  public static void setup(
+      PolarisConnectionExtension.PolarisToken userToken, @PolarisRealm String 
polarisRealm) {
+    realm = polarisRealm;
+    RateLimiterFilterTest.userToken = userToken.token();
+  }
+
+  @Test
+  public void testRateLimiter() {
+    Consumer<Response.Status> requestAsserter =
+        TestUtil.constructRequestAsserter(EXT, userToken, realm);
+    CallContext.setCurrentContext(CallContext.of(() -> "myrealm", null));
+
+    MutableClock clock = MockRealmTokenBucketRateLimiter.CLOCK;
+    clock.add(Duration.ofSeconds(2 * WINDOW_SECONDS)); // Clear any counters 
from before this test
+
+    for (int i = 0; i < REQUESTS_PER_SECOND * WINDOW_SECONDS; i++) {
+      requestAsserter.accept(Response.Status.OK);
+    }
+    requestAsserter.accept(Response.Status.TOO_MANY_REQUESTS);
+
+    clock.add(Duration.ofSeconds(4 * WINDOW_SECONDS)); // Clear any counters 
from during this test
+  }
+}
diff --git 
a/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiterTest.java
 
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiterTest.java
new file mode 100644
index 0000000..10cd71c
--- /dev/null
+++ 
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiterTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import java.time.Duration;
+import org.apache.polaris.core.context.CallContext;
+import org.junit.jupiter.api.Test;
+import org.threeten.extra.MutableClock;
+
+/** Main unit test class for TokenBucketRateLimiter */
+public class RealmTokenBucketRateLimiterTest {
+  @Test
+  void testDifferentBucketsDontTouch() {
+    RateLimiter rateLimiter = new MockRealmTokenBucketRateLimiter(10, 10);
+    RateLimitResultAsserter asserter = new 
RateLimitResultAsserter(rateLimiter);
+    MutableClock clock = MockRealmTokenBucketRateLimiter.CLOCK;
+
+    for (int i = 0; i < 202; i++) {
+      String realm = (i % 2 == 0) ? "realm1" : "realm2";
+      CallContext.setCurrentContext(CallContext.of(() -> realm, null));
+
+      if (i < 200) {
+        asserter.canAcquire(1);
+      } else {
+        asserter.cantAcquire();
+      }
+    }
+
+    clock.add(Duration.ofSeconds(1));
+    for (int i = 0; i < 22; i++) {
+      String realm = (i % 2 == 0) ? "realm1" : "realm2";
+      CallContext.setCurrentContext(CallContext.of(() -> realm, null));
+
+      if (i < 20) {
+        asserter.canAcquire(1);
+      } else {
+        asserter.cantAcquire();
+      }
+    }
+  }
+}
diff --git 
a/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/TestUtil.java
 
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/TestUtil.java
new file mode 100644
index 0000000..43404be
--- /dev/null
+++ 
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/TestUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import static 
org.apache.polaris.service.context.DefaultContextResolver.REALM_PROPERTY_KEY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.dropwizard.testing.junit5.DropwizardAppExtension;
+import jakarta.ws.rs.core.Response;
+import java.util.function.Consumer;
+import org.apache.polaris.service.config.PolarisApplicationConfig;
+
+/** Common test utils for testing rate limiting */
+public class TestUtil {
+  /**
+   * Constructs a function that makes a request to list all principal roles 
and asserts the status
+   * of the response. This is a relatively simple type of request that can be 
used for validating
+   * whether the rate limiter intervenes.
+   */
+  public static Consumer<Response.Status> constructRequestAsserter(
+      DropwizardAppExtension<PolarisApplicationConfig> dropwizardAppExtension,
+      String userToken,
+      String realm) {
+    return (Response.Status status) -> {
+      try (Response response =
+          dropwizardAppExtension
+              .client()
+              .target(
+                  String.format(
+                      "http://localhost:%d/api/management/v1/principal-roles";,
+                      dropwizardAppExtension.getLocalPort()))
+              .request("application/json")
+              .header("Authorization", "Bearer " + userToken)
+              .header(REALM_PROPERTY_KEY, realm)
+              .get()) {
+        assertThat(response).returns(status.getStatusCode(), 
Response::getStatus);
+      }
+    };
+  }
+}
diff --git 
a/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiterTest.java
 
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiterTest.java
new file mode 100644
index 0000000..57a62e2
--- /dev/null
+++ 
b/polaris-service/src/test/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiterTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.ratelimiter;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.threeten.extra.MutableClock;
+
+/** Main unit test class for TokenBucketRateLimiter */
+public class TokenBucketRateLimiterTest {
+  @Test
+  void testBasic() {
+    MutableClock clock = MutableClock.of(Instant.now(), ZoneOffset.UTC);
+    clock.add(Duration.ofSeconds(5));
+
+    RateLimitResultAsserter asserter =
+        new RateLimitResultAsserter(new TokenBucketRateLimiter(10, 100, 
clock));
+
+    asserter.canAcquire(100);
+    asserter.cantAcquire();
+
+    clock.add(Duration.ofSeconds(1));
+    asserter.canAcquire(10);
+    asserter.cantAcquire();
+
+    clock.add(Duration.ofSeconds(10));
+    asserter.canAcquire(100);
+    asserter.cantAcquire();
+  }
+
+  /**
+   * Starts several threads that try to query the rate limiter at the same 
time, ensuring that we
+   * only allow "maxTokens" requests
+   */
+  @Test
+  void testConcurrent() throws InterruptedException {
+    int maxTokens = 100;
+    int numTasks = 50000;
+    int tokensPerSecond = 10; // Can be anything above 0
+
+    TokenBucketRateLimiter rl =
+        new TokenBucketRateLimiter(
+            tokensPerSecond, maxTokens, Clock.fixed(Instant.now(), 
ZoneOffset.UTC));
+    AtomicInteger numAcquired = new AtomicInteger();
+    CountDownLatch startLatch = new CountDownLatch(numTasks);
+    CountDownLatch endLatch = new CountDownLatch(numTasks);
+
+    try (ExecutorService executor = 
Executors.newVirtualThreadPerTaskExecutor()) {
+      for (int i = 0; i < numTasks; i++) {
+        int i_ = i;
+        executor.submit(
+            () -> {
+              try {
+                // Enforce that tasks pause until all tasks are submitted
+                startLatch.countDown();
+                startLatch.await();
+
+                if (rl.tryAcquire()) {
+                  numAcquired.incrementAndGet();
+                }
+
+                endLatch.countDown();
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+            });
+      }
+    }
+
+    endLatch.await();
+    Assertions.assertEquals(maxTokens, numAcquired.get());
+  }
+}
diff --git 
a/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
 
b/polaris-service/src/test/resources/META-INF/services/org.apache.polaris.service.ratelimiter.RateLimiter
similarity index 67%
copy from 
polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
copy to 
polaris-service/src/test/resources/META-INF/services/org.apache.polaris.service.ratelimiter.RateLimiter
index e290634..c43c88a 100644
--- 
a/polaris-service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable
+++ 
b/polaris-service/src/test/resources/META-INF/services/org.apache.polaris.service.ratelimiter.RateLimiter
@@ -17,10 +17,4 @@
 # under the License.
 #
 
-org.apache.polaris.service.auth.DiscoverableAuthenticator
-org.apache.polaris.core.persistence.MetaStoreManagerFactory
-org.apache.polaris.service.config.OAuth2ApiService
-org.apache.polaris.service.context.RealmContextResolver
-org.apache.polaris.service.context.CallContextResolver
-org.apache.polaris.service.auth.TokenBrokerFactory
-org.apache.polaris.service.catalog.FileIOFactory
+org.apache.polaris.service.ratelimiter.MockRealmTokenBucketRateLimiter
diff --git 
a/polaris-service/src/test/resources/polaris-server-integrationtest.yml 
b/polaris-service/src/test/resources/polaris-server-integrationtest.yml
index d6a1aac..4b430f3 100644
--- a/polaris-service/src/test/resources/polaris-server-integrationtest.yml
+++ b/polaris-service/src/test/resources/polaris-server-integrationtest.yml
@@ -152,3 +152,9 @@ logging:
 
 # Limits the size of request bodies sent to Polaris. -1 means no limit.
 maxRequestBodyBytes: 1000000
+
+# Limits the request rate per realm
+rateLimiter:
+  type: realm-token-bucket
+  requestsPerSecond: 10
+  windowSeconds: 10


Reply via email to