turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
       template_fields = fields  # 30 template fields
   
       @apply_defaults
       def __init__(self, *args, **kwargs):
           super().__init__(*args, task_id=kwargs["task_id"])
           for key in kwargs:
               if key.startswith("field"):
                   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
       N = 10
       OP_N = 100
   
       @repeat_and_time.timing(N)
       @repeat_and_time.repeat(N)
       def case():
           with DAG("xcomargs_test", default_args={"start_date": 
datetime.today()}):
               op1 = DummyOperator(task_id="op1")
               for i in range(OP_N):
                   kwargs = {k: op1.output for k in fields}
                   CustomOp(task_id=f"task_{i}", **kwargs)
   
       print("OPs: ", OP_N)
       case()
   ```
   Average time for 100 OPs in single DAG:
   - metaclass: 471.470 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** slower than metaclass... 
   
   But to make everything works smooth we can mix those two approaches:
   ```python
   class BaseOperatorMeta(type):
       def __call__(cls, *args, **kwargs):
           obj: BaseOperator = type.__call__(cls, *args, **kwargs)
           # Set upstream task defined by XComArgs passed to template fields of 
an operator
           obj.set_xcomargs_dependencies()
           obj._instantiated = True
           return obj
   
   
   class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):
       _instantiated = False
   
       def __setattr__(self, key, value):
           super().__setattr__(key, value)
           if self._instantiated and key in self.template_fields:
               self.set_xcomargs_dependencies()
   ```
   
   this gives the following result **471.381 ms**
   
   Edit: additionaly we should add an `_locked_for_execution` flag to 
BaseOperator and set it to True before executing. Then adjust `__setattr__` to
   ```python
       def __setattr__(self, key, value):
           super().__setattr__(key, value)
           if self._instantiated and not _locked_for_execution  and key in 
self.template_fields:
               self.set_xcomargs_dependencies()
   ```
   Why? We don't want to do any custom action (including resolving upstream) 
during execution. And it may happen that someone will assign/reassign value to 
template field attribute in `execute()`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to