[GitHub] [airflow] maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator
maganaluis commented on a change in pull request #8256: URL: https://github.com/apache/airflow/pull/8256#discussion_r436408970 ## File path: tests/operators/test_python_operator.py ## @@ -340,6 +341,80 @@ def test_echo_env_variables(self): t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) +class TestPythonVirtualenvOperator(TestPythonBase): +@classmethod +def setUpClass(cls): +super(TestPythonVirtualenvOperator, cls).setUpClass() +with create_session() as session: +session.query(DagRun).delete() +session.query(TI).delete() +session.commit() + +def setUp(self): +super(TestPythonVirtualenvOperator, self).setUp() + +def del_env(key): +try: +del os.environ[key] +except KeyError: +pass + +del_env('AIRFLOW_CTX_DAG_ID') +del_env('AIRFLOW_CTX_TASK_ID') +del_env('AIRFLOW_CTX_EXECUTION_DATE') +del_env('AIRFLOW_CTX_DAG_RUN_ID') +self.dag = DAG( +'test_dag', +default_args={ +'owner': 'airflow', +'start_date': DEFAULT_DATE}) +self.addCleanup(self.dag.clear) +self.clear_run() +self.addCleanup(self.clear_run) + +def tearDown(self): +super(TestPythonVirtualenvOperator, self).tearDown() +with create_session() as session: +session.query(DagRun).delete() +session.query(TI).delete() +session.commit() + +for var in TI_CONTEXT_ENV_VARS: +if var in os.environ: +del os.environ[var] + +def clear_run(self): +self.run = False + +def do_run(self): +self.run = True + +def is_run(self): +return self.run + +def test_config_context(self): +""" +This test ensures we can use dag_run from the context +to access the configuration at run time that's being +passed from the UI, CLI, and REST API. +""" +self.dag.create_dagrun( +run_id='manual__' + DEFAULT_DATE.isoformat(), +execution_date=DEFAULT_DATE, +start_date=DEFAULT_DATE, +state=State.RUNNING, +external_trigger=False, +) + +def pass_function(**kwargs): +kwargs['dag_run'].conf + +t = PythonVirtualenvOperator(task_id='config_dag_run', dag=self.dag, + provide_context=True, + python_callable=pass_function) +t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) Review comment: This test runs at about 2 sec, that's not too slow... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator
maganaluis commented on a change in pull request #8256: URL: https://github.com/apache/airflow/pull/8256#discussion_r419189015 ## File path: airflow/operators/python_operator.py ## @@ -330,13 +330,28 @@ def _write_string_args(self, filename): def _write_args(self, input_filename): # serialize args to file +if self.use_dill: +serializer = dill +else: +serializer = pickle +# some args from context can't be loaded in virtual env +invalid_args = set(['dag', 'task', 'ti']) if self._pass_op_args(): +kwargs = {} +for key, value in self.op_kwargs.items(): Review comment: @Fokko Thank you for taking time to review this PR, I've updated it based on your suggestions. ```python def _write_args(self, input_filename): # serialize args to file if self.use_dill: serializer = dill else: serializer = pickle # some items from context can't be loaded in virtual env # see pr https://github.com/apache/airflow/pull/8256 not_serializable = {'dag', 'task', 'ti', 'macros', 'task_instance', 'var'} if self._pass_op_args(): kwargs = {key: value for key, value in self.op_kwargs.items() if key not in not_serializable} with open(input_filename, 'wb') as f: arg_dict = ({'args': self.op_args, 'kwargs': kwargs}) serializer.dump(arg_dict, f) The invalid arguments here are SQLAlchemy db models, some fail to serialize, and some others (dag, task, and ti) fail to deserialize at run time due to reference to objects which are no longer present. See log. [log_1.txt](https://github.com/apache/airflow/files/4572523/log_1.txt) To solve this issues we would need to implement a __setstate__ and __getstate__ internal functions in those objects so they are properly serialized. This is not the scope of this PR and does represent considerable work, and tests. I've also added a pytest to ensure this gets tested in the future. ```python def test_config_context(self): """ This test ensures we can use dag_run from the context to access the configuration at run time that's being passed from the UI, CLI, and REST API. """ self.dag.create_dagrun( run_id='manual__' + DEFAULT_DATE.isoformat(), execution_date=DEFAULT_DATE, start_date=DEFAULT_DATE, state=State.RUNNING, external_trigger=False, ) def pass_function(**kwargs): kwargs['dag_run'].conf t = PythonVirtualenvOperator(task_id='config_dag_run', dag=self.dag, provide_context=True, python_callable=pass_function) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator
maganaluis commented on a change in pull request #8256: URL: https://github.com/apache/airflow/pull/8256#discussion_r419189015 ## File path: airflow/operators/python_operator.py ## @@ -330,13 +330,28 @@ def _write_string_args(self, filename): def _write_args(self, input_filename): # serialize args to file +if self.use_dill: +serializer = dill +else: +serializer = pickle +# some args from context can't be loaded in virtual env +invalid_args = set(['dag', 'task', 'ti']) if self._pass_op_args(): +kwargs = {} +for key, value in self.op_kwargs.items(): Review comment: @Fokko Thank you for taking time to review this PR, I've updated it based on your suggestions. ```python def _write_args(self, input_filename): # serialize args to file if self.use_dill: serializer = dill else: serializer = pickle # some items from context can't be loaded in virtual env # see pr https://github.com/apache/airflow/pull/8256 not_serializable = {'dag', 'task', 'ti', 'macros', 'task_instance', 'var'} if self._pass_op_args(): kwargs = {key: value for key, value in self.op_kwargs.items() if key not in not_serializable} with open(input_filename, 'wb') as f: arg_dict = ({'args': self.op_args, 'kwargs': kwargs}) serializer.dump(arg_dict, f) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator
maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator URL: https://github.com/apache/airflow/pull/8256#discussion_r409112631 ## File path: airflow/operators/python_operator.py ## @@ -330,13 +330,28 @@ def _write_string_args(self, filename): def _write_args(self, input_filename): # serialize args to file +if self.use_dill: +serializer = dill +else: +serializer = pickle +# some args from context can't be loaded in virtual env +invalid_args = set(['dag', 'task', 'ti']) Review comment: @jloehel I don't mind if the PR doesn't merge, as long as PythonVirtualenvOp gets fixed, I think that will help a lot people. Feel free to send me an e-mail if you would like to follow up in case I need to modify that function again. Should be in my profile. Cheers :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator
maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator URL: https://github.com/apache/airflow/pull/8256#discussion_r408498490 ## File path: airflow/operators/python_operator.py ## @@ -330,13 +330,28 @@ def _write_string_args(self, filename): def _write_args(self, input_filename): # serialize args to file +if self.use_dill: +serializer = dill +else: +serializer = pickle +# some args from context can't be loaded in virtual env +invalid_args = set(['dag', 'task', 'ti']) Review comment: @jloehel We have not encountered any cases where to use Xcom, so I'm not familiar with it. Let me know if you want me to make any modification to this code, or you'll be making changes on your side. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services