This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d0223389f47 update RateLimiter execution function name (#37287)
d0223389f47 is described below
commit d0223389f47f8085477e113391d7ae5961bc0bbc
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Tue Jan 13 00:40:47 2026 +0530
update RateLimiter execution function name (#37287)
---
.../apache_beam/examples/rate_limiter_simple.py | 2 +-
.../apache_beam/io/components/rate_limiter.py | 4 ++--
.../apache_beam/io/components/rate_limiter_test.py | 24 +++++++++++-----------
sdks/python/apache_beam/ml/inference/base.py | 2 +-
sdks/python/apache_beam/ml/inference/base_test.py | 4 ++--
5 files changed, 18 insertions(+), 18 deletions(-)
diff --git a/sdks/python/apache_beam/examples/rate_limiter_simple.py
b/sdks/python/apache_beam/examples/rate_limiter_simple.py
index ea469006f2b..8cdf1166aad 100644
--- a/sdks/python/apache_beam/examples/rate_limiter_simple.py
+++ b/sdks/python/apache_beam/examples/rate_limiter_simple.py
@@ -53,7 +53,7 @@ class SampleApiDoFn(beam.DoFn):
self.rate_limiter = self._shared.acquire(init_limiter)
def process(self, element):
- self.rate_limiter.throttle()
+ self.rate_limiter.allow()
# Process the element mock API call
logging.info("Processing element: %s", element)
diff --git a/sdks/python/apache_beam/io/components/rate_limiter.py
b/sdks/python/apache_beam/io/components/rate_limiter.py
index 5c3b36e8ab0..2dc8a5340fd 100644
--- a/sdks/python/apache_beam/io/components/rate_limiter.py
+++ b/sdks/python/apache_beam/io/components/rate_limiter.py
@@ -61,7 +61,7 @@ class RateLimiter(abc.ABC):
self.rpc_latency = Metrics.distribution(namespace, 'RatelimitRpcLatencyMs')
@abc.abstractmethod
- def throttle(self, **kwargs) -> bool:
+ def allow(self, **kwargs) -> bool:
"""Applies rate limiting to the request.
This method checks if the request is permitted by the rate limiting policy.
@@ -148,7 +148,7 @@ class EnvoyRateLimiter(RateLimiter):
channel = grpc.insecure_channel(self.service_address)
self._stub = EnvoyRateLimiter.RateLimitServiceStub(channel)
- def throttle(self, hits_added: int = 1) -> bool:
+ def allow(self, hits_added: int = 1) -> bool:
"""Calls the Envoy RLS to apply rate limits.
Sends a rate limit request to the configured Envoy Rate Limit Service.
diff --git a/sdks/python/apache_beam/io/components/rate_limiter_test.py
b/sdks/python/apache_beam/io/components/rate_limiter_test.py
index 7c3e7b82aad..24d30a1c5c9 100644
--- a/sdks/python/apache_beam/io/components/rate_limiter_test.py
+++ b/sdks/python/apache_beam/io/components/rate_limiter_test.py
@@ -42,7 +42,7 @@ class EnvoyRateLimiterTest(unittest.TestCase):
namespace='test_namespace')
@mock.patch('grpc.insecure_channel')
- def test_throttle_allowed(self, mock_channel):
+ def test_allow_success(self, mock_channel):
# Mock successful OK response
mock_stub = mock.Mock()
mock_response = RateLimitResponse(overall_code=RateLimitResponseCode.OK)
@@ -51,13 +51,13 @@ class EnvoyRateLimiterTest(unittest.TestCase):
# Inject mock stub
self.limiter._stub = mock_stub
- throttled = self.limiter.throttle()
+ allowed = self.limiter.allow()
- self.assertTrue(throttled)
+ self.assertTrue(allowed)
mock_stub.ShouldRateLimit.assert_called_once()
@mock.patch('grpc.insecure_channel')
- def test_throttle_over_limit_retries_exceeded(self, mock_channel):
+ def test_allow_over_limit_retries_exceeded(self, mock_channel):
# Mock OVER_LIMIT response
mock_stub = mock.Mock()
mock_response = RateLimitResponse(
@@ -69,9 +69,9 @@ class EnvoyRateLimiterTest(unittest.TestCase):
# We mock time.sleep to run fast
with mock.patch('time.sleep'):
- throttled = self.limiter.throttle()
+ allowed = self.limiter.allow()
- self.assertFalse(throttled)
+ self.assertFalse(allowed)
# Should be called 1 (initial) + 2 (retries) + 1 (last check > retries
# logic depends on loop)
# Logic: attempt starts at 0.
@@ -83,7 +83,7 @@ class EnvoyRateLimiterTest(unittest.TestCase):
self.assertEqual(mock_stub.ShouldRateLimit.call_count, 3)
@mock.patch('grpc.insecure_channel')
- def test_throttle_rpc_error_retry(self, mock_channel):
+ def test_allow_rpc_error_retry(self, mock_channel):
# Mock RpcError then Success
mock_stub = mock.Mock()
mock_response = RateLimitResponse(overall_code=RateLimitResponseCode.OK)
@@ -95,13 +95,13 @@ class EnvoyRateLimiterTest(unittest.TestCase):
self.limiter._stub = mock_stub
with mock.patch('time.sleep'):
- throttled = self.limiter.throttle()
+ allowed = self.limiter.allow()
- self.assertTrue(throttled)
+ self.assertTrue(allowed)
self.assertEqual(mock_stub.ShouldRateLimit.call_count, 3)
@mock.patch('grpc.insecure_channel')
- def test_throttle_rpc_error_fail(self, mock_channel):
+ def test_allow_rpc_error_fail(self, mock_channel):
# Mock Persistent RpcError
mock_stub = mock.Mock()
error = grpc.RpcError()
@@ -111,7 +111,7 @@ class EnvoyRateLimiterTest(unittest.TestCase):
with mock.patch('time.sleep'):
with self.assertRaises(grpc.RpcError):
- self.limiter.throttle()
+ self.limiter.allow()
# The inner loop tries 5 times for connection errors
self.assertEqual(mock_stub.ShouldRateLimit.call_count, 5)
@@ -134,7 +134,7 @@ class EnvoyRateLimiterTest(unittest.TestCase):
self.limiter.retries = 0 # Single attempt
with mock.patch('time.sleep') as mock_sleep:
- self.limiter.throttle()
+ self.limiter.allow()
# Should sleep for 5 seconds (jitter is 0.0)
mock_sleep.assert_called_with(5.0)
diff --git a/sdks/python/apache_beam/ml/inference/base.py
b/sdks/python/apache_beam/ml/inference/base.py
index ada7cb3237d..da7b363052d 100644
--- a/sdks/python/apache_beam/ml/inference/base.py
+++ b/sdks/python/apache_beam/ml/inference/base.py
@@ -450,7 +450,7 @@ class RemoteModelHandler(ABC, ModelHandler[ExampleT,
PredictionT, ModelT]):
self._shared_rate_limiter = self._shared_handle.acquire(init_limiter)
- if not self._shared_rate_limiter.throttle(hits_added=len(batch)):
+ if not self._shared_rate_limiter.allow(hits_added=len(batch)):
raise RateLimitExceeded(
"Rate Limit Exceeded, "
"Could not process this batch.")
diff --git a/sdks/python/apache_beam/ml/inference/base_test.py
b/sdks/python/apache_beam/ml/inference/base_test.py
index e6865a13ef8..381bf545660 100644
--- a/sdks/python/apache_beam/ml/inference/base_test.py
+++ b/sdks/python/apache_beam/ml/inference/base_test.py
@@ -2076,7 +2076,7 @@ class RunInferenceRemoteTest(unittest.TestCase):
def __init__(self):
super().__init__(namespace='test_namespace')
- def throttle(self, hits_added=1):
+ def allow(self, hits_added=1):
self.requests_counter.inc()
return True
@@ -2114,7 +2114,7 @@ class RunInferenceRemoteTest(unittest.TestCase):
def __init__(self):
super().__init__(namespace='test_namespace')
- def throttle(self, hits_added=1):
+ def allow(self, hits_added=1):
return False
class ConcreteRemoteModelHandler(base.RemoteModelHandler):