danthev commented on a change in pull request #14723:
URL: https://github.com/apache/beam/pull/14723#discussion_r637178001
##########
File path: sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
##########
@@ -276,15 +277,33 @@ class _Mutate(PTransform):
Only idempotent Datastore mutation operations (upsert and delete) are
supported, as the commits are retried when failures occur.
"""
- def __init__(self, mutate_fn):
+
+ # Default hint for the expected number of workers in the ramp-up throttling
+ # step for write or delete operations.
+ _DEFAULT_HINT_NUM_WORKERS = 500
Review comment:
I've thought about this before. Generally what I would expect is a slow
start, but autoscaling quickly scaling up when the throttling limit increases,
yielding a similar result to starting with the desired number of workers.
However, I was weighing whether to report `throttlingMs` or if that stifles
autoscaling (the Firestore implementation reports it). I thought I remembered
previous test showing not much of a difference, but I've run a few more tests
and I do see a significant difference now. This is an example I ran with the
Java implementation and `maxNumWorkers` at 50, where I commented out
incrementing the `throttlingMs` counter:

That's about what I would expect, autoscaling scales up after 10-15 minutes,
then the ramp-up proceeds normally. I just ran that same example with the
original implementation, reporting `throttlingMs`, and Dataflow seems prone to
getting stuck at 1 worker with very little throughput.
So if I drop `throttlingMs`, autoscaling should be fairly normal in terms of
behavior. Are there other side effects with autoscaling I should consider?
##########
File path:
sdks/python/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py
##########
@@ -0,0 +1,80 @@
+import datetime
+import logging
+import time
+from typing import TypeVar
+
+from apache_beam import typehints
+from apache_beam.io.gcp.datastore.v1new import util
+from apache_beam.transforms import DoFn
+from apache_beam.utils.retry import FuzzedExponentialIntervals
+
+T = TypeVar('T')
+
+_LOG = logging.getLogger(__name__)
+
+
[email protected]_input_types(T)
[email protected]_output_types(T)
+class RampupThrottlingFn(DoFn):
+ """A ``DoFn`` that throttles ramp-up following an exponential function.
+
+ An implementation of a client-side throttler that enforces a gradual ramp-up,
+ broadly in line with Datastore best practices. See also
+ https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic.
+ """
+ def to_runner_api_parameter(self, unused_context):
+ from apache_beam.internal import pickler
+ config = {
+ 'num_workers': self._num_workers,
+ }
+ return 'beam:fn:rampup_throttling:v0', pickler.dumps(config)
+
+ _BASE_BUDGET = 500
+ _RAMP_UP_INTERVAL = datetime.timedelta(minutes=5)
+
+ def __init__(self, num_workers, *unused_args, **unused_kwargs):
+ """Initializes a ramp-up throttler transform.
+
+ Args:
+ num_workers: A hint for the expected number of workers, used to derive
+ the local rate limit.
+ """
+ super(RampupThrottlingFn, self).__init__(*unused_args, **unused_kwargs)
+ self._num_workers = num_workers
+ self._successful_ops = util.MovingSum(window_ms=1000, bucket_ms=1000)
+ self._first_instant = datetime.datetime.now()
+
+ def _calc_max_ops_budget(
+ self,
+ first_instant: datetime.datetime,
+ current_instant: datetime.datetime):
+ """Function that returns per-second budget according to best practices.
+
+ The exact function is `500 / num_shards * 1.5^max(0, (x-5)/5)`, where x is
+ the number of minutes since start time.
+ """
+ timedelta_since_first = current_instant - first_instant
+ growth = max(
+ 0.0, (timedelta_since_first - self._RAMP_UP_INTERVAL) /
+ self._RAMP_UP_INTERVAL)
+ max_ops_budget = int(self._BASE_BUDGET / self._num_workers * (1.5**growth))
Review comment:
Unless I made a mistake translating this from Java, that "first instant"
is serialized and kept for each DoFn instance, so the computed per-second
budget is across bundles, but only _per worker_. That's why I split the budget
by estimated worker count into a per-worker budget.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]