This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d0223389f47 update RateLimiter execution function name (#37287)
d0223389f47 is described below

commit d0223389f47f8085477e113391d7ae5961bc0bbc
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Tue Jan 13 00:40:47 2026 +0530

    update RateLimiter execution function name (#37287)
---
 .../apache_beam/examples/rate_limiter_simple.py    |  2 +-
 .../apache_beam/io/components/rate_limiter.py      |  4 ++--
 .../apache_beam/io/components/rate_limiter_test.py | 24 +++++++++++-----------
 sdks/python/apache_beam/ml/inference/base.py       |  2 +-
 sdks/python/apache_beam/ml/inference/base_test.py  |  4 ++--
 5 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/sdks/python/apache_beam/examples/rate_limiter_simple.py 
b/sdks/python/apache_beam/examples/rate_limiter_simple.py
index ea469006f2b..8cdf1166aad 100644
--- a/sdks/python/apache_beam/examples/rate_limiter_simple.py
+++ b/sdks/python/apache_beam/examples/rate_limiter_simple.py
@@ -53,7 +53,7 @@ class SampleApiDoFn(beam.DoFn):
     self.rate_limiter = self._shared.acquire(init_limiter)
 
   def process(self, element):
-    self.rate_limiter.throttle()
+    self.rate_limiter.allow()
 
     # Process the element mock API call
     logging.info("Processing element: %s", element)
diff --git a/sdks/python/apache_beam/io/components/rate_limiter.py 
b/sdks/python/apache_beam/io/components/rate_limiter.py
index 5c3b36e8ab0..2dc8a5340fd 100644
--- a/sdks/python/apache_beam/io/components/rate_limiter.py
+++ b/sdks/python/apache_beam/io/components/rate_limiter.py
@@ -61,7 +61,7 @@ class RateLimiter(abc.ABC):
     self.rpc_latency = Metrics.distribution(namespace, 'RatelimitRpcLatencyMs')
 
   @abc.abstractmethod
-  def throttle(self, **kwargs) -> bool:
+  def allow(self, **kwargs) -> bool:
     """Applies rate limiting to the request.
 
     This method checks if the request is permitted by the rate limiting policy.
@@ -148,7 +148,7 @@ class EnvoyRateLimiter(RateLimiter):
           channel = grpc.insecure_channel(self.service_address)
           self._stub = EnvoyRateLimiter.RateLimitServiceStub(channel)
 
-  def throttle(self, hits_added: int = 1) -> bool:
+  def allow(self, hits_added: int = 1) -> bool:
     """Calls the Envoy RLS to apply rate limits.
 
     Sends a rate limit request to the configured Envoy Rate Limit Service.
diff --git a/sdks/python/apache_beam/io/components/rate_limiter_test.py 
b/sdks/python/apache_beam/io/components/rate_limiter_test.py
index 7c3e7b82aad..24d30a1c5c9 100644
--- a/sdks/python/apache_beam/io/components/rate_limiter_test.py
+++ b/sdks/python/apache_beam/io/components/rate_limiter_test.py
@@ -42,7 +42,7 @@ class EnvoyRateLimiterTest(unittest.TestCase):
         namespace='test_namespace')
 
   @mock.patch('grpc.insecure_channel')
-  def test_throttle_allowed(self, mock_channel):
+  def test_allow_success(self, mock_channel):
     # Mock successful OK response
     mock_stub = mock.Mock()
     mock_response = RateLimitResponse(overall_code=RateLimitResponseCode.OK)
@@ -51,13 +51,13 @@ class EnvoyRateLimiterTest(unittest.TestCase):
     # Inject mock stub
     self.limiter._stub = mock_stub
 
-    throttled = self.limiter.throttle()
+    allowed = self.limiter.allow()
 
-    self.assertTrue(throttled)
+    self.assertTrue(allowed)
     mock_stub.ShouldRateLimit.assert_called_once()
 
   @mock.patch('grpc.insecure_channel')
-  def test_throttle_over_limit_retries_exceeded(self, mock_channel):
+  def test_allow_over_limit_retries_exceeded(self, mock_channel):
     # Mock OVER_LIMIT response
     mock_stub = mock.Mock()
     mock_response = RateLimitResponse(
@@ -69,9 +69,9 @@ class EnvoyRateLimiterTest(unittest.TestCase):
 
     # We mock time.sleep to run fast
     with mock.patch('time.sleep'):
-      throttled = self.limiter.throttle()
+      allowed = self.limiter.allow()
 
-    self.assertFalse(throttled)
+    self.assertFalse(allowed)
     # Should be called 1 (initial) + 2 (retries) + 1 (last check > retries
     # logic depends on loop)
     # Logic: attempt starts at 0.
@@ -83,7 +83,7 @@ class EnvoyRateLimiterTest(unittest.TestCase):
     self.assertEqual(mock_stub.ShouldRateLimit.call_count, 3)
 
   @mock.patch('grpc.insecure_channel')
-  def test_throttle_rpc_error_retry(self, mock_channel):
+  def test_allow_rpc_error_retry(self, mock_channel):
     # Mock RpcError then Success
     mock_stub = mock.Mock()
     mock_response = RateLimitResponse(overall_code=RateLimitResponseCode.OK)
@@ -95,13 +95,13 @@ class EnvoyRateLimiterTest(unittest.TestCase):
     self.limiter._stub = mock_stub
 
     with mock.patch('time.sleep'):
-      throttled = self.limiter.throttle()
+      allowed = self.limiter.allow()
 
-    self.assertTrue(throttled)
+    self.assertTrue(allowed)
     self.assertEqual(mock_stub.ShouldRateLimit.call_count, 3)
 
   @mock.patch('grpc.insecure_channel')
-  def test_throttle_rpc_error_fail(self, mock_channel):
+  def test_allow_rpc_error_fail(self, mock_channel):
     # Mock Persistent RpcError
     mock_stub = mock.Mock()
     error = grpc.RpcError()
@@ -111,7 +111,7 @@ class EnvoyRateLimiterTest(unittest.TestCase):
 
     with mock.patch('time.sleep'):
       with self.assertRaises(grpc.RpcError):
-        self.limiter.throttle()
+        self.limiter.allow()
 
     # The inner loop tries 5 times for connection errors
     self.assertEqual(mock_stub.ShouldRateLimit.call_count, 5)
@@ -134,7 +134,7 @@ class EnvoyRateLimiterTest(unittest.TestCase):
     self.limiter.retries = 0  # Single attempt
 
     with mock.patch('time.sleep') as mock_sleep:
-      self.limiter.throttle()
+      self.limiter.allow()
       # Should sleep for 5 seconds (jitter is 0.0)
       mock_sleep.assert_called_with(5.0)
 
diff --git a/sdks/python/apache_beam/ml/inference/base.py 
b/sdks/python/apache_beam/ml/inference/base.py
index ada7cb3237d..da7b363052d 100644
--- a/sdks/python/apache_beam/ml/inference/base.py
+++ b/sdks/python/apache_beam/ml/inference/base.py
@@ -450,7 +450,7 @@ class RemoteModelHandler(ABC, ModelHandler[ExampleT, 
PredictionT, ModelT]):
 
         self._shared_rate_limiter = self._shared_handle.acquire(init_limiter)
 
-      if not self._shared_rate_limiter.throttle(hits_added=len(batch)):
+      if not self._shared_rate_limiter.allow(hits_added=len(batch)):
         raise RateLimitExceeded(
             "Rate Limit Exceeded, "
             "Could not process this batch.")
diff --git a/sdks/python/apache_beam/ml/inference/base_test.py 
b/sdks/python/apache_beam/ml/inference/base_test.py
index e6865a13ef8..381bf545660 100644
--- a/sdks/python/apache_beam/ml/inference/base_test.py
+++ b/sdks/python/apache_beam/ml/inference/base_test.py
@@ -2076,7 +2076,7 @@ class RunInferenceRemoteTest(unittest.TestCase):
       def __init__(self):
         super().__init__(namespace='test_namespace')
 
-      def throttle(self, hits_added=1):
+      def allow(self, hits_added=1):
         self.requests_counter.inc()
         return True
 
@@ -2114,7 +2114,7 @@ class RunInferenceRemoteTest(unittest.TestCase):
       def __init__(self):
         super().__init__(namespace='test_namespace')
 
-      def throttle(self, hits_added=1):
+      def allow(self, hits_added=1):
         return False
 
     class ConcreteRemoteModelHandler(base.RemoteModelHandler):

Reply via email to