SamWheating commented on issue #37621:
URL: https://github.com/apache/airflow/issues/37621#issuecomment-1967305008

   From my understanding, the worker process will sometimes (depending on dag 
serialization, etc) need to reprocess DAG files. This means that the task and 
dag policies can be reapplied at run-time, since they're applied every time a 
file is parsed.
   
   > Is there any way for policies not to run on workers, by configuration?
   
   There's definitely some (slightly hacky) ways around this - could you just 
add some logic to your policy to early exit if it detects that its running on a 
worker? Some ideas come to mind:
   
   
   1) Check the dag parsing context to see if the policy is being run within a 
worker process:
   ```python
   from airflow.utils.dag_parsing_context import get_parsing_context
   
   def task_policy(task) -> None:
   
     if get_parsing_context().task_id is not None: # this will only be true at 
the time of task execution
       return
   
     print("Running the rest of the policy..")
   ```
   
   2) Set a variable in your kubernetes executor pod template file, and then 
check that in your policy:
   ```python
   import os
   
   def task_policy(task) -> None:
   
     if environ.get('YOUR_VARIABLE_HERE') is not None:
       return
   
     print("Running the rest of the policy..")
   ```
   
   I guess we could add some sort of configuration to automate this, but in my 
opinion running policies at parse time is expected behaviour and shouldn't 
introduce additional complexity when running on executors, aside from rare 
cases like this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to