This is worth a design discussion in its own right, but here's my input. You're using a DAG with sensor operators to determine if something needs to be triggered. There is a time between the sensor "ok-ing" the progression and the dag being triggered and the first task being spun up. This interval can easily lead to a race conditions where another sensor elsewhere still sees a non-rate limited condition and may also initiate the dag. It's likely a rate limit will result from that.
Second, should there still be a rate limit in effect, then the operators in the DAG won't respect the back-off period from there, because you passed that check already. For that reason I'd do this slightly differently in a more managed way. I don't have sufficient background with the business requirements and how many adwords related work there is in total, but here are three options to look into: - Make the adwords hook, which raises RateLimitException for example, then let the operator respond to that and manage redis. FIrst check with redis when it starts, then call adwords and in case of failure, update redis and probably go into the retry loop. You can set a low interval here, because it will check with redis anyway, that way you can support back off periods of any resolution. - Just use a pool with the number of required simultaneous processes and play with the variables and rates to avoid the rate limit in the first place. That way, you can maximize the API usage without creating a stampeding herd that will probably lead to failure anyway. - There's another approach thinkable where a dag "requests" the use of the API by inserting a record in a queue in redis, where the main dag does the actual triggering (so that all scheduling is centralized), but that's like building a scheduler in a scheduler and in the end, a pool would give you the same functionality without all the hassle. Rgds, Gerard On Fri, Aug 10, 2018 at 12:41 PM Robin Edwards <r...@goshift.com> wrote: > Thanks Gerard, > > Yea pools look really useful for limiting concurrent requests. > > Where you mention the use of a hook would you simply raise an exception > from get_conn() should the adwords account be rate limited then just > configure a number of retries and appropriate delay / back off on the > operator doing work with the api? > > I have come up with part of a solution using a key sensor and trigger dag > run. The idea would be that when my 'adwords_func' encounters a rate limit > error it sets a key in redis with an expiry matching the period in the rate > limit response then re-triggers the dag which will block on my sensor until > the key has expired. > > The hard part is now getting this mechanism to work within a sub dag as I > have multiple api operations that need limiting. > > def _adwords_rate_limited_dag(dag_id, adwords_func, max_dag_retries, > **kwargs): > dag = DAG(dag_id, **kwargs) > > def count_retries(context, obj): > retries = context['dag_run'].conf.get('dag_retries', 1) > > if retries > max_dag_retries: > raise SystemError("Max retries reached for dag") > > obj.payload = {'dag_retries': retries + 1} > > return obj > > with dag: > RedisNoKeySensor( > task_id='check_for_rate_limit', > key='rate_limited', > redis_conn_id='redis_master', > poke_interval=10 > ) >> PythonOperator( > task_id=shift_callable.__name__, > python_callable=adwords_callable, > ) >> TriggerDagRunOperator( > task_id='retry_dag_on_failure', > trigger_dag_id=dag_id, > trigger_rule=TriggerRule.ONE_FAILED, > python_callable=count_retries > ) > > return dag > > Thanks for your help, > > Rob > > > On Thu, Aug 9, 2018 at 7:07 PM, Gerard Toonstra <gtoons...@gmail.com> > wrote: > > > Have you looked into pools? Pools allow you to specify how many tasks at > > any given time should use a common resource. > > That way you could limit this to 1, 2, or 3 for example. Pools are not > > dynamic however, so it only allows you to upper limit how many > > number of clients are going to hit the API at any moment, not determine > how > > many when the rate limit is in effect > > (unless.... you use code to reconfigure the pool on demand, but I'm not > > sure if I should recommend that, i.e. reconfigure the # of clients > > on the basis of hitting the rate limit.) It sounds as if this logic is > > best introduced at the hook level, where it determines that it passes > > out an API interface only when the rate limit is not in place, where > > operators specify how many retries should occur. > > > > The Adwords API does allow increasing the rate limit threshold though and > > you're probably better off negotiating > > with Google to up that threshold, explaining your business case etc.? > > > > Gerard > > > > > > > > On Thu, Aug 9, 2018 at 10:43 AM r...@goshift.com <r...@goshift.com> wrote: > > > > > Hello, > > > > > > I am in the process of migrating a bespoke data pipe line built around > > > celery into airflow. > > > > > > We have a number of different tasks which interact with the Adwords API > > > which has a rate limiting policy. The policy isn't a fixed number of > > > requests its variable. > > > > > > In our celery code we have handled this by capturing a rate limit error > > > response and setting a key in redis to make sure that no tasks execute > > > against the API until it's expired. Any task that does get executed > > checks > > > for the presence of the key and if the key exists issues a retry for > when > > > the rate limit is due to expire. > > > > > > Moving over to Airflow I can't find a way to go about scheduling a task > > to > > > retry in a specific amount of time. Doing some reading it seems a > Sensor > > > could work to prevent other dags from executing whilst the rate limit > is > > > present. > > > > > > I also can't seem to find an example of handling different exceptions > > from > > > a python task and adapting the retry logic accordingly. > > > > > > Any pointers would be much appreciated, > > > > > > Rob > > > > > >