feluelle commented on a change in pull request #9394: URL: https://github.com/apache/airflow/pull/9394#discussion_r444889415
########## File path: airflow/operators/python.py ########## @@ -412,140 +427,113 @@ def __init__( # pylint: disable=too-many-arguments templates_exts=templates_exts, *args, **kwargs) - self.requirements = requirements or [] + self.requirements = list(requirements or []) self.string_args = string_args or [] self.python_version = python_version self.use_dill = use_dill self.system_site_packages = system_site_packages - # check that dill is present if needed - dill_in_requirements = map(lambda x: x.lower().startswith('dill'), - self.requirements) - if (not system_site_packages) and use_dill and not any(dill_in_requirements): - raise AirflowException('If using dill, dill must be in the environment ' + - 'either via system_site_packages or requirements') - # check that a function is passed, and that it is not a lambda - if (not isinstance(self.python_callable, - types.FunctionType) or (self.python_callable.__name__ == - (lambda x: 0).__name__)): - raise AirflowException('{} only supports functions for python_callable arg'.format( - self.__class__.__name__)) - # check that args are passed iff python major version matches - if (python_version is not None and - str(python_version)[0] != str(sys.version_info[0]) and - self._pass_op_args()): - raise AirflowException("Passing op_args or op_kwargs is not supported across " - "different Python major versions " - "for PythonVirtualenvOperator. " - "Please use string_args.") + if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements: + self.requirements.append('dill') + self.pickling_library = dill if self.use_dill else pickle def execute_callable(self): with TemporaryDirectory(prefix='venv') as tmp_dir: if self.templates_dict: self.op_kwargs['templates_dict'] = self.templates_dict - # generate filenames + input_filename = os.path.join(tmp_dir, 'script.in') output_filename = os.path.join(tmp_dir, 'script.out') string_args_filename = os.path.join(tmp_dir, 'string_args.txt') script_filename = os.path.join(tmp_dir, 'script.py') - # set up virtualenv - python_bin = 'python' + str(self.python_version) if self.python_version else None prepare_virtualenv( venv_directory=tmp_dir, - python_bin=python_bin, + python_bin=f'python{self.python_version}' if self.python_version else None, system_site_packages=self.system_site_packages, - requirements=self.requirements, + requirements=self.requirements ) self._write_args(input_filename) - self._write_script(script_filename) self._write_string_args(string_args_filename) + self._write_script(script_filename) + + execute_in_subprocess(cmd=[ + f'{tmp_dir}/bin/python', + script_filename, + input_filename, + output_filename, + string_args_filename + ]) - # execute command in virtualenv - execute_in_subprocess( - self._generate_python_cmd(tmp_dir, - script_filename, - input_filename, - output_filename, - string_args_filename)) return self._read_result(output_filename) - def _pass_op_args(self): - # we should only pass op_args if any are given to us - return len(self.op_args) + len(self.op_kwargs) > 0 + def _write_args(self, filename): + if self.op_args or self.op_kwargs: + if self.op_kwargs: + # some items from context can't be loaded in virtual env + self._keep_serializable_op_kwargs() + with open(filename, 'wb') as file: + self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file) + + def _keep_serializable_op_kwargs(self): + # Remove unserializable objects + # otherwise "KeyError: 'Variable __getstate__ does not exist'" would be raised. + self.op_kwargs.pop('var', None) + # otherwise "TypeError: cannot serialize '_io.FileIO' object" would be raised. + self.op_kwargs.pop('task_instance', None) + self.op_kwargs.pop('ti', None) + + if self.system_site_packages or 'apache-airflow' in self.requirements: + # All can be serialized expecting it to run in an airflow env. + return + + # Not access to host packages and no apache-airflow installed. + # Remove airflow specific context + # otherwise "ModuleNotFoundError: No module named 'airflow'" would be raised. + self.op_kwargs.pop('macros', None) + self.op_kwargs.pop('conf', None) + self.op_kwargs.pop('dag', None) + self.op_kwargs.pop('dag_run', None) + self.op_kwargs.pop('task', None) + + if 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements: + # ..but pendulum is installed so keep pendulum date objects + # Note: 'lazy_object_proxy' is needed to work. + return + + # No pendulum is installed either. So remove pendulum specific context. + # otherwise "ModuleNotFoundError: No module named 'pendulum'" would be raised. + self.op_kwargs.pop('execution_date', None) + self.op_kwargs.pop('next_execution_date', None) + self.op_kwargs.pop('prev_execution_date', None) + self.op_kwargs.pop('prev_execution_date_success', None) + self.op_kwargs.pop('prev_start_date_success', None) def _write_string_args(self, filename): - # writes string_args to a file, which are read line by line with open(filename, 'w') as file: file.write('\n'.join(map(str, self.string_args))) - def _write_args(self, input_filename): - # serialize args to file - if self._pass_op_args(): - with open(input_filename, 'wb') as file: - arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs}) - if self.use_dill: - dill.dump(arg_dict, file) - else: - pickle.dump(arg_dict, file) - - def _read_result(self, output_filename): - if os.stat(output_filename).st_size == 0: + def _write_script(self, filename): + with open(filename, 'w') as file: + python_code = render_virtualenv_script( + jinja_context=dict( + op_args=self.op_args, + op_kwargs=self.op_kwargs, + pickling_library=self.pickling_library.__name__, + python_callable=self.python_callable.__name__, + python_callable_source=dedent(inspect.getsource(self.python_callable)) + ) + ) + self.log.debug('Writing code to file\n %s', python_code) Review comment: I added `:` but the new line should be there I think. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org