Thanks Gerard, Yea pools look really useful for limiting concurrent requests.
Where you mention the use of a hook would you simply raise an exception from get_conn() should the adwords account be rate limited then just configure a number of retries and appropriate delay / back off on the operator doing work with the api? I have come up with part of a solution using a key sensor and trigger dag run. The idea would be that when my 'adwords_func' encounters a rate limit error it sets a key in redis with an expiry matching the period in the rate limit response then re-triggers the dag which will block on my sensor until the key has expired. The hard part is now getting this mechanism to work within a sub dag as I have multiple api operations that need limiting. def _adwords_rate_limited_dag(dag_id, adwords_func, max_dag_retries, **kwargs): dag = DAG(dag_id, **kwargs) def count_retries(context, obj): retries = context['dag_run'].conf.get('dag_retries', 1) if retries > max_dag_retries: raise SystemError("Max retries reached for dag") obj.payload = {'dag_retries': retries + 1} return obj with dag: RedisNoKeySensor( task_id='check_for_rate_limit', key='rate_limited', redis_conn_id='redis_master', poke_interval=10 ) >> PythonOperator( task_id=shift_callable.__name__, python_callable=adwords_callable, ) >> TriggerDagRunOperator( task_id='retry_dag_on_failure', trigger_dag_id=dag_id, trigger_rule=TriggerRule.ONE_FAILED, python_callable=count_retries ) return dag Thanks for your help, Rob On Thu, Aug 9, 2018 at 7:07 PM, Gerard Toonstra <gtoons...@gmail.com> wrote: > Have you looked into pools? Pools allow you to specify how many tasks at > any given time should use a common resource. > That way you could limit this to 1, 2, or 3 for example. Pools are not > dynamic however, so it only allows you to upper limit how many > number of clients are going to hit the API at any moment, not determine how > many when the rate limit is in effect > (unless.... you use code to reconfigure the pool on demand, but I'm not > sure if I should recommend that, i.e. reconfigure the # of clients > on the basis of hitting the rate limit.) It sounds as if this logic is > best introduced at the hook level, where it determines that it passes > out an API interface only when the rate limit is not in place, where > operators specify how many retries should occur. > > The Adwords API does allow increasing the rate limit threshold though and > you're probably better off negotiating > with Google to up that threshold, explaining your business case etc.? > > Gerard > > > > On Thu, Aug 9, 2018 at 10:43 AM r...@goshift.com <r...@goshift.com> wrote: > > > Hello, > > > > I am in the process of migrating a bespoke data pipe line built around > > celery into airflow. > > > > We have a number of different tasks which interact with the Adwords API > > which has a rate limiting policy. The policy isn't a fixed number of > > requests its variable. > > > > In our celery code we have handled this by capturing a rate limit error > > response and setting a key in redis to make sure that no tasks execute > > against the API until it's expired. Any task that does get executed > checks > > for the presence of the key and if the key exists issues a retry for when > > the rate limit is due to expire. > > > > Moving over to Airflow I can't find a way to go about scheduling a task > to > > retry in a specific amount of time. Doing some reading it seems a Sensor > > could work to prevent other dags from executing whilst the rate limit is > > present. > > > > I also can't seem to find an example of handling different exceptions > from > > a python task and adapting the retry logic accordingly. > > > > Any pointers would be much appreciated, > > > > Rob > > >