Thanks Gerard, that's really helpful it would have taken me some time to pinpoint that race condition.
I will go with your suggestion and implement a hook and manage the logic within the operator its self, Rob On Sun, Aug 12, 2018 at 9:28 AM, Gerard Toonstra <gtoons...@gmail.com> wrote: > This is worth a design discussion in its own right, but here's my input. > > You're using a DAG with sensor operators to determine if something needs to > be triggered. > There is a time between the sensor "ok-ing" the progression and the dag > being triggered and the > first task being spun up. This interval can easily lead to a race > conditions where another sensor elsewhere still > sees a non-rate limited condition and may also initiate the dag. It's > likely a rate limit will result from that. > > Second, should there still be a rate limit in effect, then the operators in > the DAG won't respect the back-off > period from there, because you passed that check already. > > > For that reason I'd do this slightly differently in a more managed way. I > don't have sufficient background with > the business requirements and how many adwords related work there is in > total, but here are three options to look into: > > - Make the adwords hook, which raises RateLimitException for example, then > let the operator respond to that and > manage redis. FIrst check with redis when it starts, then call adwords > and in case of failure, update redis and probably > go into the retry loop. You can set a low interval here, because it will > check with redis anyway, that way you can support > back off periods of any resolution. > > - Just use a pool with the number of required simultaneous processes and > play with the variables and rates to avoid the > rate limit in the first place. That way, you can maximize the API usage > without creating a stampeding herd that will probably > lead to failure anyway. > > - There's another approach thinkable where a dag "requests" the use of the > API by inserting a record in a queue in redis, > where the main dag does the actual triggering (so that all scheduling is > centralized), but that's like building a scheduler in a > scheduler and in the end, a pool would give you the same functionality > without all the hassle. > > Rgds, > > Gerard > > On Fri, Aug 10, 2018 at 12:41 PM Robin Edwards <r...@goshift.com> wrote: > > > Thanks Gerard, > > > > Yea pools look really useful for limiting concurrent requests. > > > > Where you mention the use of a hook would you simply raise an exception > > from get_conn() should the adwords account be rate limited then just > > configure a number of retries and appropriate delay / back off on the > > operator doing work with the api? > > > > I have come up with part of a solution using a key sensor and trigger dag > > run. The idea would be that when my 'adwords_func' encounters a rate > limit > > error it sets a key in redis with an expiry matching the period in the > rate > > limit response then re-triggers the dag which will block on my sensor > until > > the key has expired. > > > > The hard part is now getting this mechanism to work within a sub dag as I > > have multiple api operations that need limiting. > > > > def _adwords_rate_limited_dag(dag_id, adwords_func, max_dag_retries, > > **kwargs): > > dag = DAG(dag_id, **kwargs) > > > > def count_retries(context, obj): > > retries = context['dag_run'].conf.get('dag_retries', 1) > > > > if retries > max_dag_retries: > > raise SystemError("Max retries reached for dag") > > > > obj.payload = {'dag_retries': retries + 1} > > > > return obj > > > > with dag: > > RedisNoKeySensor( > > task_id='check_for_rate_limit', > > key='rate_limited', > > redis_conn_id='redis_master', > > poke_interval=10 > > ) >> PythonOperator( > > task_id=shift_callable.__name__, > > python_callable=adwords_callable, > > ) >> TriggerDagRunOperator( > > task_id='retry_dag_on_failure', > > trigger_dag_id=dag_id, > > trigger_rule=TriggerRule.ONE_FAILED, > > python_callable=count_retries > > ) > > > > return dag > > > > Thanks for your help, > > > > Rob > > > > > > On Thu, Aug 9, 2018 at 7:07 PM, Gerard Toonstra <gtoons...@gmail.com> > > wrote: > > > > > Have you looked into pools? Pools allow you to specify how many tasks > at > > > any given time should use a common resource. > > > That way you could limit this to 1, 2, or 3 for example. Pools are not > > > dynamic however, so it only allows you to upper limit how many > > > number of clients are going to hit the API at any moment, not determine > > how > > > many when the rate limit is in effect > > > (unless.... you use code to reconfigure the pool on demand, but I'm not > > > sure if I should recommend that, i.e. reconfigure the # of clients > > > on the basis of hitting the rate limit.) It sounds as if this logic is > > > best introduced at the hook level, where it determines that it passes > > > out an API interface only when the rate limit is not in place, where > > > operators specify how many retries should occur. > > > > > > The Adwords API does allow increasing the rate limit threshold though > and > > > you're probably better off negotiating > > > with Google to up that threshold, explaining your business case etc.? > > > > > > Gerard > > > > > > > > > > > > On Thu, Aug 9, 2018 at 10:43 AM r...@goshift.com <r...@goshift.com> > wrote: > > > > > > > Hello, > > > > > > > > I am in the process of migrating a bespoke data pipe line built > around > > > > celery into airflow. > > > > > > > > We have a number of different tasks which interact with the Adwords > API > > > > which has a rate limiting policy. The policy isn't a fixed number of > > > > requests its variable. > > > > > > > > In our celery code we have handled this by capturing a rate limit > error > > > > response and setting a key in redis to make sure that no tasks > execute > > > > against the API until it's expired. Any task that does get executed > > > checks > > > > for the presence of the key and if the key exists issues a retry for > > when > > > > the rate limit is due to expire. > > > > > > > > Moving over to Airflow I can't find a way to go about scheduling a > task > > > to > > > > retry in a specific amount of time. Doing some reading it seems a > > Sensor > > > > could work to prevent other dags from executing whilst the rate limit > > is > > > > present. > > > > > > > > I also can't seem to find an example of handling different exceptions > > > from > > > > a python task and adapting the retry logic accordingly. > > > > > > > > Any pointers would be much appreciated, > > > > > > > > Rob > > > > > > > > > >