jrmccluskey commented on code in PR #37218:
URL: https://github.com/apache/beam/pull/37218#discussion_r2668857683
##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -2071,6 +2071,70 @@ def run_inference(self,
responses.append(model.predict(example))
return responses
+ def test_run_inference_with_rate_limiter(self):
+ class FakeRateLimiter(base.RateLimiter):
+ def __init__(self):
+ super().__init__(namespace='test_namespace')
+
+ def throttle(self, hits_added=1):
+ self.requests_counter.inc()
+ return True
Review Comment:
This is going back to the base implementation of the rate limiter, but
throttle() returning True logically makes me think that the request *should* be
throttled, not that it's approved. See the adaptive throttler definition of
`throttle_request()`
(https://github.com/apache/beam/blob/8a9011382e54b8a39d7105d2949d8d0d079e1d3a/sdks/python/apache_beam/io/components/adaptive_throttler.py#L103)
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -431,6 +437,19 @@ def run_inference(
Returns:
An Iterable of Predictions.
"""
+ if self._rate_limiter:
+ if self._shared_rate_limiter is None:
+
+ def init_limiter():
+ return self._rate_limiter
+
+ self._shared_rate_limiter = self._shared_handle.acquire(init_limiter)
+
+ if not self._shared_rate_limiter.throttle(hits_added=len(batch)):
+ raise RuntimeError(
+ "Rate Limit Exceeded, "
+ "Could not process this batch.")
Review Comment:
I don't love using a runtime error here. It pushes users towards allowing
retries on RuntimeErrors as part of their retry filter, which could mask real
problems in their code (and keep it spinning when it should fail fast.) It may
be worth defining a custom exception type here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]