[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

2020-06-01 Thread GitBox


turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636777316


   > > Why? We don't want to do any custom action (including resolving 
upstream) during execution. And it may happen that someone will assign/reassign 
value to template field attribute in execute()
   > 
   > This is in fact _exactly_ what `task.render_template_fields()` does, so 
yes, we do want to set something like this.
   
   @ashb true, but I think we don't want to set upstreams when executing a 
task, right?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

2020-05-31 Thread GitBox


turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
   template_fields = fields  # 30 template fields
   
   @apply_defaults
   def __init__(self, *args, **kwargs):
   super().__init__(*args, task_id=kwargs["task_id"])
   for key in kwargs:
   if key.startswith("field"):
   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
   N = 10
   OP_N = 100
   
   @repeat_and_time.timing(N)
   @repeat_and_time.repeat(N)
   def case():
   with DAG("xcomargs_test", default_args={"start_date": 
datetime.today()}):
   op1 = DummyOperator(task_id="op1")
   for i in range(OP_N):
   kwargs = {k: op1.output for k in fields}
   CustomOp(task_id=f"task_{i}", **kwargs)
   
   print("OPs: ", OP_N)
   case()
   ```
   Average time for 100 OPs in single DAG:
   - metaclass: 471.470 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** times slower than metaclass... 
   
   But to make everything works smooth we can mix those two approaches:
   ```python
   class BaseOperatorMeta(type):
   def __call__(cls, *args, **kwargs):
   obj: BaseOperator = type.__call__(cls, *args, **kwargs)
   # Set upstream task defined by XComArgs passed to template fields of 
an operator
   obj.set_xcomargs_dependencies()
   obj._instantiated = True
   return obj
   
   
   class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):
   _instantiated = False
   
   def __setattr__(self, key, value):
   super().__setattr__(key, value)
   if self._instantiated and key in self.template_fields:
   self.set_xcomargs_dependencies()
   ```
   
   this gives the following result **471.381 ms**
   
   Edit: additionaly we should add an `_locked_for_execution` flag to 
BaseOperator and set it to True before executing. Then adjust `__setattr__` to
   ```python
   def __setattr__(self, key, value):
   super().__setattr__(key, value)
   if self._instantiated and not _locked_for_execution  and key in 
self.template_fields:
   self.set_xcomargs_dependencies()
   ```
   Why? We don't want to do any custom action (including resolving upstream) 
during execution. And it may happen that someone will assign/reassign value to 
template field attribute in `execute()`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

2020-05-31 Thread GitBox


turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
   template_fields = fields  # 30 template fields
   
   @apply_defaults
   def __init__(self, *args, **kwargs):
   super().__init__(*args, task_id=kwargs["task_id"])
   for key in kwargs:
   if key.startswith("field"):
   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
   N = 10
   OP_N = 100
   
   @repeat_and_time.timing(N)
   @repeat_and_time.repeat(N)
   def case():
   with DAG("xcomargs_test", default_args={"start_date": 
datetime.today()}):
   op1 = DummyOperator(task_id="op1")
   for i in range(OP_N):
   kwargs = {k: op1.output for k in fields}
   CustomOp(task_id=f"task_{i}", **kwargs)
   
   print("OPs: ", OP_N)
   case()
   ```
   Average time for 100 OPs in single DAG:
   - metaclass: 471.470 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** slower than metaclass... 
   
   But to make everything works smooth we can mix those two approaches:
   ```python
   class BaseOperatorMeta(type):
   def __call__(cls, *args, **kwargs):
   obj: BaseOperator = type.__call__(cls, *args, **kwargs)
   # Set upstream task defined by XComArgs passed to template fields of 
an operator
   obj.set_xcomargs_dependencies()
   obj._instantiated = True
   return obj
   
   
   class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):
   _instantiated = False
   
   def __setattr__(self, key, value):
   super().__setattr__(key, value)
   if self._instantiated and key in self.template_fields:
   self.set_xcomargs_dependencies()
   ```
   
   this gives the following result **471.381 ms**
   
   Edit: additionaly we should add an `_locked_for_execution` flag to 
BaseOperator and set it to True before executing. Then adjust `__setattr__` to
   ```python
   def __setattr__(self, key, value):
   super().__setattr__(key, value)
   if self._instantiated and not _locked_for_execution  and key in 
self.template_fields:
   self.set_xcomargs_dependencies()
   ```
   Why? We don't want to do any custom action (including resolving upstream) 
during execution. And it may happen that someone will assign/reassign value to 
template field attribute in `execute()`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

2020-05-30 Thread GitBox


turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
   template_fields = fields  # 30 template fields
   
   @apply_defaults
   def __init__(self, *args, **kwargs):
   super().__init__(*args, task_id=kwargs["task_id"])
   for key in kwargs:
   if key.startswith("field"):
   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
   N = 10
   OP_N = 100
   
   @repeat_and_time.timing(N)
   @repeat_and_time.repeat(N)
   def case():
   with DAG("xcomargs_test", default_args={"start_date": 
datetime.today()}):
   op1 = DummyOperator(task_id="op1")
   for i in range(OP_N):
   kwargs = {k: op1.output for k in fields}
   CustomOp(task_id=f"task_{i}", **kwargs)
   
   print("OPs: ", OP_N)
   case()
   ```
   Average time for 100 OPs in single DAG:
   - metaclass: 471.470 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** slower than metaclass... 
   
   But to make everything works smooth we can mix those two approaches:
   ```python
   class BaseOperatorMeta(type):
   def __call__(cls, *args, **kwargs):
   obj: BaseOperator = type.__call__(cls, *args, **kwargs)
   # Set upstream task defined by XComArgs passed to template fields of 
an operator
   obj.set_xcomargs_dependencies()
   obj._instantiated = True
   return obj
   
   
   class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):
   _instantiated = False
   
   def __setattr__(self, key, value):
   super().__setattr__(key, value)
   if self._instantiated and key in self.template_fields:
   self.set_xcomargs_dependencies()
   ```
   
   this gives the following result **471.381 ms**



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

2020-05-30 Thread GitBox


turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
   template_fields = fields  # 30 template fields
   
   @apply_defaults
   def __init__(self, *args, **kwargs):
   super().__init__(*args, task_id=kwargs["task_id"])
   for key in kwargs:
   if key.startswith("field"):
   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
   N = 10
   OP_N = 100
   
   @repeat_and_time.timing(N)
   @repeat_and_time.repeat(N)
   def case():
   with DAG("xcomargs_test", default_args={"start_date": 
datetime.today()}):
   op1 = DummyOperator(task_id="op1")
   for i in range(OP_N):
   kwargs = {k: op1.output for k in fields}
   CustomOp(task_id=f"task_{i}", **kwargs)
   
   print("OPs: ", OP_N)
   case()
   ```
   Average time for 100 OPs in single DAG:
   - metaclass: 492.990 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** slower than metaclass... 
   
   But to make everything works smooth we can mix those two approaches:
   ```python
   class BaseOperatorMeta(type):
   def __call__(cls, *args, **kwargs):
   obj: BaseOperator = type.__call__(cls, *args, **kwargs)
   # Set upstream task defined by XComArgs passed to template fields of 
an operator
   obj.set_xcomargs_dependencies()
   obj._instantiated = True
   return obj
   
   
   class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):
   _instantiated = False
   
   def __setattr__(self, key, value):
   super().__setattr__(key, value)
   if self._instantiated and key in self.template_fields:
   self.set_xcomargs_dependencies()
   ```
   
   this gives the following result **471.381 ms**



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

2020-05-30 Thread GitBox


turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
   template_fields = fields
   
   @apply_defaults
   def __init__(self, *args, **kwargs):
   super().__init__(*args, task_id=kwargs["task_id"])
   for key in kwargs:
   if key.startswith("field"):
   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
   N = 10
   OP_N = 100
   
   @repeat_and_time.timing(N)
   @repeat_and_time.repeat(N)
   def case():
   with DAG("xcomargs_test", default_args={"start_date": 
datetime.today()}):
   op1 = DummyOperator(task_id="op1")
   for i in range(OP_N):
   kwargs = {k: op1.output for k in fields}
   CustomOp(task_id=f"task_{i}", **kwargs)
   
   print("OPs: ", OP_N)
   case()
   ```
   Average time for 100 OPs in single DAG:
   - metaclass: 492.990 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** slower than metaclass... 
   
   But to make everything works smooth we can mix those two approaches:
   ```python
   class BaseOperatorMeta(type):
   def __call__(cls, *args, **kwargs):
   obj: BaseOperator = type.__call__(cls, *args, **kwargs)
   # Set upstream task defined by XComArgs passed to template fields of 
an operator
   obj.set_xcomargs_dependencies()
   obj._instantiated = True
   return obj
   
   
   class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):
   _instantiated = False
   
   def __setattr__(self, key, value):
   super().__setattr__(key, value)
   if self._instantiated and key in self.template_fields:
   self.set_xcomargs_dependencies()
   ```
   
   this gives the following result **471.381 ms**



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

2020-05-30 Thread GitBox


turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
   template_fields = fields
   
   @apply_defaults
   def __init__(self, *args, **kwargs):
   super().__init__(*args, task_id=kwargs["task_id"])
   for key in kwargs:
   if key.startswith("field"):
   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
   N = 10
   DAG_N = 100
   
   @repeat_and_time.timing(N)
   @repeat_and_time.repeat(N)
   def case():
   with DAG("xcomargs_test", default_args={"start_date": 
datetime.today()}):
   op1 = DummyOperator(task_id="op1")
   for i in range(DAG_N):
   kwargs = {k: op1.output for k in fields}
   CustomOp(task_id=f"task_{i}", **kwargs)
   
   print("DAGs: ", DAG_N)
   case()
   ```
   Results for 100 DAGs:
   - metaclass: 492.990 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** slower than metaclass... 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

2020-05-30 Thread GitBox


turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636329264


   > we could let it work when setting _any_ attribute to an XComArg, which 
would be nice
   
   That would mean that if we set 20 attributes in `__init__` then we run 
`self.set_xcomargs_dependencies` 20 times. This was the reason for the if. 
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

2020-05-30 Thread GitBox


turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636329264


   > we could let it work when setting _any_ attribute to an XComArg, which 
would be nice
   
   That would mean that if we set 20 attributes in `__init__` then we run 
`self.set_xcomargs_dependencies` 20 times. This was the reason for the if. 
   
   But more importanlty `self.set_xcomargs_dependencies` would be called before 
seeting the value for template field. For example:
   ```python
   class DummyOp(BaseOperator):
template_fields = ("field",)
   def __init__(self, field): 
   self.x = 42  # calls self.set_xcomargs_dependencies and fails 
because selg has no attr fields
   self.fields = field
   ```
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

2020-05-30 Thread GitBox


turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636327497


   @ashb  I forgot what was the problem with metaclass:
   ```python
   with DAG(dag_id='xcomargs_test', default_args={"start_date": 
datetime.today()}):
   op1 = DummyOperator(task_id="op1")
   op2 = CustomOp(task_id="op2")
   op2.field = op1.output  # value is set after init
   
   assert op1 in op2.upstream_list
   ```
   this needs to be solved (your comment 
https://github.com/apache/airflow/pull/8805#discussion_r423205936)
   
   But this hints another possible solution without metaclass:
   ```python
   class BaseOperator():
   def __setattr__(self, key, value):
   super().__setattr__(key, value)
   # Resolve upstreams set by assigning an XComArg after initializing
   # an operator, example:
   #   op = BashOperator()
   #   op.bash_command = "sleep 1"
   from airflow.models.xcom_arg import XComArg
   if key in self.template_fields:
   self.set_xcomargs_dependencies()
   ```
   This one is nice and simple, however I am not sure if this will degradate 
performance in any way .
   
   @evgenyshulman @casassg @ashb @mik-laj thoughts? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org