[GitHub] [airflow] maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator

2020-06-07 Thread GitBox


maganaluis commented on a change in pull request #8256:
URL: https://github.com/apache/airflow/pull/8256#discussion_r436408970



##
File path: tests/operators/test_python_operator.py
##
@@ -340,6 +341,80 @@ def test_echo_env_variables(self):
 t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
 
+class TestPythonVirtualenvOperator(TestPythonBase):
+@classmethod
+def setUpClass(cls):
+super(TestPythonVirtualenvOperator, cls).setUpClass()
+with create_session() as session:
+session.query(DagRun).delete()
+session.query(TI).delete()
+session.commit()
+
+def setUp(self):
+super(TestPythonVirtualenvOperator, self).setUp()
+
+def del_env(key):
+try:
+del os.environ[key]
+except KeyError:
+pass
+
+del_env('AIRFLOW_CTX_DAG_ID')
+del_env('AIRFLOW_CTX_TASK_ID')
+del_env('AIRFLOW_CTX_EXECUTION_DATE')
+del_env('AIRFLOW_CTX_DAG_RUN_ID')
+self.dag = DAG(
+'test_dag',
+default_args={
+'owner': 'airflow',
+'start_date': DEFAULT_DATE})
+self.addCleanup(self.dag.clear)
+self.clear_run()
+self.addCleanup(self.clear_run)
+
+def tearDown(self):
+super(TestPythonVirtualenvOperator, self).tearDown()
+with create_session() as session:
+session.query(DagRun).delete()
+session.query(TI).delete()
+session.commit()
+
+for var in TI_CONTEXT_ENV_VARS:
+if var in os.environ:
+del os.environ[var]
+
+def clear_run(self):
+self.run = False
+
+def do_run(self):
+self.run = True
+
+def is_run(self):
+return self.run
+
+def test_config_context(self):
+"""
+This test ensures we can use dag_run from the context
+to access the configuration at run time that's being
+passed from the UI, CLI, and REST API.
+"""
+self.dag.create_dagrun(
+run_id='manual__' + DEFAULT_DATE.isoformat(),
+execution_date=DEFAULT_DATE,
+start_date=DEFAULT_DATE,
+state=State.RUNNING,
+external_trigger=False,
+)
+
+def pass_function(**kwargs):
+kwargs['dag_run'].conf
+
+t = PythonVirtualenvOperator(task_id='config_dag_run', dag=self.dag,
+ provide_context=True,
+ python_callable=pass_function)
+t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

Review comment:
   This test runs at about 2 sec, that's not too slow... 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator

2020-05-03 Thread GitBox


maganaluis commented on a change in pull request #8256:
URL: https://github.com/apache/airflow/pull/8256#discussion_r419189015



##
File path: airflow/operators/python_operator.py
##
@@ -330,13 +330,28 @@ def _write_string_args(self, filename):
 
 def _write_args(self, input_filename):
 # serialize args to file
+if self.use_dill:
+serializer = dill
+else:
+serializer = pickle
+# some args from context can't be loaded in virtual env
+invalid_args = set(['dag', 'task', 'ti'])
 if self._pass_op_args():
+kwargs = {}
+for key, value in self.op_kwargs.items():

Review comment:
   @Fokko Thank you for taking time to review this PR, I've updated it 
based on your suggestions. 
   
   ```python
   def _write_args(self, input_filename):
   # serialize args to file
   if self.use_dill:
   serializer = dill
   else:
   serializer = pickle
   # some items from context can't be loaded in virtual env
   # see pr https://github.com/apache/airflow/pull/8256
   not_serializable = {'dag', 'task', 'ti', 'macros', 'task_instance', 
'var'}
   if self._pass_op_args():
   kwargs = {key: value for key, value in self.op_kwargs.items()
 if key not in not_serializable}
   with open(input_filename, 'wb') as f:
   arg_dict = ({'args': self.op_args, 'kwargs': kwargs})
   serializer.dump(arg_dict, f)
   
   
   The invalid arguments here are SQLAlchemy db models, some fail to serialize, 
and some others (dag, task, and ti) fail to deserialize at run time due to 
reference to objects which are no longer present. See log.
   
   [log_1.txt](https://github.com/apache/airflow/files/4572523/log_1.txt)
   
   To solve this issues we would need to implement a __setstate__ and 
__getstate__ internal functions in those objects so they are properly 
serialized. This is not the scope of this PR and does represent considerable 
work, and tests. 
   
   I've also added a pytest to ensure this gets tested in the future. 
   
   ```python
   def test_config_context(self):
   """
   This test ensures we can use dag_run from the context
   to access the configuration at run time that's being
   passed from the UI, CLI, and REST API.
   """
   self.dag.create_dagrun(
   run_id='manual__' + DEFAULT_DATE.isoformat(),
   execution_date=DEFAULT_DATE,
   start_date=DEFAULT_DATE,
   state=State.RUNNING,
   external_trigger=False,
   )
   
   def pass_function(**kwargs):
   kwargs['dag_run'].conf
   
   t = PythonVirtualenvOperator(task_id='config_dag_run', dag=self.dag,
provide_context=True,
python_callable=pass_function)
   t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator

2020-05-03 Thread GitBox


maganaluis commented on a change in pull request #8256:
URL: https://github.com/apache/airflow/pull/8256#discussion_r419189015



##
File path: airflow/operators/python_operator.py
##
@@ -330,13 +330,28 @@ def _write_string_args(self, filename):
 
 def _write_args(self, input_filename):
 # serialize args to file
+if self.use_dill:
+serializer = dill
+else:
+serializer = pickle
+# some args from context can't be loaded in virtual env
+invalid_args = set(['dag', 'task', 'ti'])
 if self._pass_op_args():
+kwargs = {}
+for key, value in self.op_kwargs.items():

Review comment:
   @Fokko Thank you for taking time to review this PR, I've updated it 
based on your suggestions. 
   
   ```python
   def _write_args(self, input_filename):
   # serialize args to file
   if self.use_dill:
   serializer = dill
   else:
   serializer = pickle
   # some items from context can't be loaded in virtual env
   # see pr https://github.com/apache/airflow/pull/8256
   not_serializable = {'dag', 'task', 'ti', 'macros', 'task_instance', 
'var'}
   if self._pass_op_args():
   kwargs = {key: value for key, value in self.op_kwargs.items()
 if key not in not_serializable}
   with open(input_filename, 'wb') as f:
   arg_dict = ({'args': self.op_args, 'kwargs': kwargs})
   serializer.dump(arg_dict, f)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator

2020-04-15 Thread GitBox
maganaluis commented on a change in pull request #8256: updated _write_args on 
PythonVirtualenvOperator
URL: https://github.com/apache/airflow/pull/8256#discussion_r409112631
 
 

 ##
 File path: airflow/operators/python_operator.py
 ##
 @@ -330,13 +330,28 @@ def _write_string_args(self, filename):
 
 def _write_args(self, input_filename):
 # serialize args to file
+if self.use_dill:
+serializer = dill
+else:
+serializer = pickle
+# some args from context can't be loaded in virtual env
+invalid_args = set(['dag', 'task', 'ti'])
 
 Review comment:
   @jloehel I don't mind if the PR doesn't merge, as long as PythonVirtualenvOp 
gets fixed, I think that will help a lot people. Feel free to send me an e-mail 
if you would like to follow up in case I need to modify that function again. 
Should be in my profile. Cheers :) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator

2020-04-14 Thread GitBox
maganaluis commented on a change in pull request #8256: updated _write_args on 
PythonVirtualenvOperator
URL: https://github.com/apache/airflow/pull/8256#discussion_r408498490
 
 

 ##
 File path: airflow/operators/python_operator.py
 ##
 @@ -330,13 +330,28 @@ def _write_string_args(self, filename):
 
 def _write_args(self, input_filename):
 # serialize args to file
+if self.use_dill:
+serializer = dill
+else:
+serializer = pickle
+# some args from context can't be loaded in virtual env
+invalid_args = set(['dag', 'task', 'ti'])
 
 Review comment:
   @jloehel We have not encountered any cases where to use Xcom, so I'm not 
familiar with it. Let me know if you want me to make any modification to this 
code, or you'll be making changes on your side. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services