[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek edited a comment on pull request #8805: URL: https://github.com/apache/airflow/pull/8805#issuecomment-636777316 > > Why? We don't want to do any custom action (including resolving upstream) during execution. And it may happen that someone will assign/reassign value to template field attribute in execute() > > This is in fact _exactly_ what `task.render_template_fields()` does, so yes, we do want to set something like this. @ashb true, but I think we don't want to set upstreams when executing a task, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek edited a comment on pull request #8805: URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773 I did a simple test: ```python from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.utils.decorators import apply_defaults from scripts.perf.perf_kit import python, memory, repeat_and_time fields = [f"field{i}" for i in range(30)] class CustomOp(DummyOperator): template_fields = fields # 30 template fields @apply_defaults def __init__(self, *args, **kwargs): super().__init__(*args, task_id=kwargs["task_id"]) for key in kwargs: if key.startswith("field"): setattr(self, key, kwargs[key]) if __name__ == '__main__': N = 10 OP_N = 100 @repeat_and_time.timing(N) @repeat_and_time.repeat(N) def case(): with DAG("xcomargs_test", default_args={"start_date": datetime.today()}): op1 = DummyOperator(task_id="op1") for i in range(OP_N): kwargs = {k: op1.output for k in fields} CustomOp(task_id=f"task_{i}", **kwargs) print("OPs: ", OP_N) case() ``` Average time for 100 OPs in single DAG: - metaclass: 471.470 ms - setattr + if: 7072.531 ms - setattr: 7208.424 ms So using setattr is **14** times slower than metaclass... But to make everything works smooth we can mix those two approaches: ```python class BaseOperatorMeta(type): def __call__(cls, *args, **kwargs): obj: BaseOperator = type.__call__(cls, *args, **kwargs) # Set upstream task defined by XComArgs passed to template fields of an operator obj.set_xcomargs_dependencies() obj._instantiated = True return obj class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta): _instantiated = False def __setattr__(self, key, value): super().__setattr__(key, value) if self._instantiated and key in self.template_fields: self.set_xcomargs_dependencies() ``` this gives the following result **471.381 ms** Edit: additionaly we should add an `_locked_for_execution` flag to BaseOperator and set it to True before executing. Then adjust `__setattr__` to ```python def __setattr__(self, key, value): super().__setattr__(key, value) if self._instantiated and not _locked_for_execution and key in self.template_fields: self.set_xcomargs_dependencies() ``` Why? We don't want to do any custom action (including resolving upstream) during execution. And it may happen that someone will assign/reassign value to template field attribute in `execute()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek edited a comment on pull request #8805: URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773 I did a simple test: ```python from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.utils.decorators import apply_defaults from scripts.perf.perf_kit import python, memory, repeat_and_time fields = [f"field{i}" for i in range(30)] class CustomOp(DummyOperator): template_fields = fields # 30 template fields @apply_defaults def __init__(self, *args, **kwargs): super().__init__(*args, task_id=kwargs["task_id"]) for key in kwargs: if key.startswith("field"): setattr(self, key, kwargs[key]) if __name__ == '__main__': N = 10 OP_N = 100 @repeat_and_time.timing(N) @repeat_and_time.repeat(N) def case(): with DAG("xcomargs_test", default_args={"start_date": datetime.today()}): op1 = DummyOperator(task_id="op1") for i in range(OP_N): kwargs = {k: op1.output for k in fields} CustomOp(task_id=f"task_{i}", **kwargs) print("OPs: ", OP_N) case() ``` Average time for 100 OPs in single DAG: - metaclass: 471.470 ms - setattr + if: 7072.531 ms - setattr: 7208.424 ms So using setattr is **14** slower than metaclass... But to make everything works smooth we can mix those two approaches: ```python class BaseOperatorMeta(type): def __call__(cls, *args, **kwargs): obj: BaseOperator = type.__call__(cls, *args, **kwargs) # Set upstream task defined by XComArgs passed to template fields of an operator obj.set_xcomargs_dependencies() obj._instantiated = True return obj class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta): _instantiated = False def __setattr__(self, key, value): super().__setattr__(key, value) if self._instantiated and key in self.template_fields: self.set_xcomargs_dependencies() ``` this gives the following result **471.381 ms** Edit: additionaly we should add an `_locked_for_execution` flag to BaseOperator and set it to True before executing. Then adjust `__setattr__` to ```python def __setattr__(self, key, value): super().__setattr__(key, value) if self._instantiated and not _locked_for_execution and key in self.template_fields: self.set_xcomargs_dependencies() ``` Why? We don't want to do any custom action (including resolving upstream) during execution. And it may happen that someone will assign/reassign value to template field attribute in `execute()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek edited a comment on pull request #8805: URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773 I did a simple test: ```python from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.utils.decorators import apply_defaults from scripts.perf.perf_kit import python, memory, repeat_and_time fields = [f"field{i}" for i in range(30)] class CustomOp(DummyOperator): template_fields = fields # 30 template fields @apply_defaults def __init__(self, *args, **kwargs): super().__init__(*args, task_id=kwargs["task_id"]) for key in kwargs: if key.startswith("field"): setattr(self, key, kwargs[key]) if __name__ == '__main__': N = 10 OP_N = 100 @repeat_and_time.timing(N) @repeat_and_time.repeat(N) def case(): with DAG("xcomargs_test", default_args={"start_date": datetime.today()}): op1 = DummyOperator(task_id="op1") for i in range(OP_N): kwargs = {k: op1.output for k in fields} CustomOp(task_id=f"task_{i}", **kwargs) print("OPs: ", OP_N) case() ``` Average time for 100 OPs in single DAG: - metaclass: 471.470 ms - setattr + if: 7072.531 ms - setattr: 7208.424 ms So using setattr is **14** slower than metaclass... But to make everything works smooth we can mix those two approaches: ```python class BaseOperatorMeta(type): def __call__(cls, *args, **kwargs): obj: BaseOperator = type.__call__(cls, *args, **kwargs) # Set upstream task defined by XComArgs passed to template fields of an operator obj.set_xcomargs_dependencies() obj._instantiated = True return obj class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta): _instantiated = False def __setattr__(self, key, value): super().__setattr__(key, value) if self._instantiated and key in self.template_fields: self.set_xcomargs_dependencies() ``` this gives the following result **471.381 ms** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek edited a comment on pull request #8805: URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773 I did a simple test: ```python from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.utils.decorators import apply_defaults from scripts.perf.perf_kit import python, memory, repeat_and_time fields = [f"field{i}" for i in range(30)] class CustomOp(DummyOperator): template_fields = fields # 30 template fields @apply_defaults def __init__(self, *args, **kwargs): super().__init__(*args, task_id=kwargs["task_id"]) for key in kwargs: if key.startswith("field"): setattr(self, key, kwargs[key]) if __name__ == '__main__': N = 10 OP_N = 100 @repeat_and_time.timing(N) @repeat_and_time.repeat(N) def case(): with DAG("xcomargs_test", default_args={"start_date": datetime.today()}): op1 = DummyOperator(task_id="op1") for i in range(OP_N): kwargs = {k: op1.output for k in fields} CustomOp(task_id=f"task_{i}", **kwargs) print("OPs: ", OP_N) case() ``` Average time for 100 OPs in single DAG: - metaclass: 492.990 ms - setattr + if: 7072.531 ms - setattr: 7208.424 ms So using setattr is **14** slower than metaclass... But to make everything works smooth we can mix those two approaches: ```python class BaseOperatorMeta(type): def __call__(cls, *args, **kwargs): obj: BaseOperator = type.__call__(cls, *args, **kwargs) # Set upstream task defined by XComArgs passed to template fields of an operator obj.set_xcomargs_dependencies() obj._instantiated = True return obj class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta): _instantiated = False def __setattr__(self, key, value): super().__setattr__(key, value) if self._instantiated and key in self.template_fields: self.set_xcomargs_dependencies() ``` this gives the following result **471.381 ms** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek edited a comment on pull request #8805: URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773 I did a simple test: ```python from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.utils.decorators import apply_defaults from scripts.perf.perf_kit import python, memory, repeat_and_time fields = [f"field{i}" for i in range(30)] class CustomOp(DummyOperator): template_fields = fields @apply_defaults def __init__(self, *args, **kwargs): super().__init__(*args, task_id=kwargs["task_id"]) for key in kwargs: if key.startswith("field"): setattr(self, key, kwargs[key]) if __name__ == '__main__': N = 10 OP_N = 100 @repeat_and_time.timing(N) @repeat_and_time.repeat(N) def case(): with DAG("xcomargs_test", default_args={"start_date": datetime.today()}): op1 = DummyOperator(task_id="op1") for i in range(OP_N): kwargs = {k: op1.output for k in fields} CustomOp(task_id=f"task_{i}", **kwargs) print("OPs: ", OP_N) case() ``` Average time for 100 OPs in single DAG: - metaclass: 492.990 ms - setattr + if: 7072.531 ms - setattr: 7208.424 ms So using setattr is **14** slower than metaclass... But to make everything works smooth we can mix those two approaches: ```python class BaseOperatorMeta(type): def __call__(cls, *args, **kwargs): obj: BaseOperator = type.__call__(cls, *args, **kwargs) # Set upstream task defined by XComArgs passed to template fields of an operator obj.set_xcomargs_dependencies() obj._instantiated = True return obj class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta): _instantiated = False def __setattr__(self, key, value): super().__setattr__(key, value) if self._instantiated and key in self.template_fields: self.set_xcomargs_dependencies() ``` this gives the following result **471.381 ms** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek edited a comment on pull request #8805: URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773 I did a simple test: ```python from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.utils.decorators import apply_defaults from scripts.perf.perf_kit import python, memory, repeat_and_time fields = [f"field{i}" for i in range(30)] class CustomOp(DummyOperator): template_fields = fields @apply_defaults def __init__(self, *args, **kwargs): super().__init__(*args, task_id=kwargs["task_id"]) for key in kwargs: if key.startswith("field"): setattr(self, key, kwargs[key]) if __name__ == '__main__': N = 10 DAG_N = 100 @repeat_and_time.timing(N) @repeat_and_time.repeat(N) def case(): with DAG("xcomargs_test", default_args={"start_date": datetime.today()}): op1 = DummyOperator(task_id="op1") for i in range(DAG_N): kwargs = {k: op1.output for k in fields} CustomOp(task_id=f"task_{i}", **kwargs) print("DAGs: ", DAG_N) case() ``` Results for 100 DAGs: - metaclass: 492.990 ms - setattr + if: 7072.531 ms - setattr: 7208.424 ms So using setattr is **14** slower than metaclass... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek edited a comment on pull request #8805: URL: https://github.com/apache/airflow/pull/8805#issuecomment-636329264 > we could let it work when setting _any_ attribute to an XComArg, which would be nice That would mean that if we set 20 attributes in `__init__` then we run `self.set_xcomargs_dependencies` 20 times. This was the reason for the if. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek edited a comment on pull request #8805: URL: https://github.com/apache/airflow/pull/8805#issuecomment-636329264 > we could let it work when setting _any_ attribute to an XComArg, which would be nice That would mean that if we set 20 attributes in `__init__` then we run `self.set_xcomargs_dependencies` 20 times. This was the reason for the if. But more importanlty `self.set_xcomargs_dependencies` would be called before seeting the value for template field. For example: ```python class DummyOp(BaseOperator): template_fields = ("field",) def __init__(self, field): self.x = 42 # calls self.set_xcomargs_dependencies and fails because selg has no attr fields self.fields = field ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek edited a comment on pull request #8805: URL: https://github.com/apache/airflow/pull/8805#issuecomment-636327497 @ashb I forgot what was the problem with metaclass: ```python with DAG(dag_id='xcomargs_test', default_args={"start_date": datetime.today()}): op1 = DummyOperator(task_id="op1") op2 = CustomOp(task_id="op2") op2.field = op1.output # value is set after init assert op1 in op2.upstream_list ``` this needs to be solved (your comment https://github.com/apache/airflow/pull/8805#discussion_r423205936) But this hints another possible solution without metaclass: ```python class BaseOperator(): def __setattr__(self, key, value): super().__setattr__(key, value) # Resolve upstreams set by assigning an XComArg after initializing # an operator, example: # op = BashOperator() # op.bash_command = "sleep 1" from airflow.models.xcom_arg import XComArg if key in self.template_fields: self.set_xcomargs_dependencies() ``` This one is nice and simple, however I am not sure if this will degradate performance in any way . @evgenyshulman @casassg @ashb @mik-laj thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org