Repository: incubator-airflow Updated Branches: refs/heads/master 55b56a433 -> 07db7a3d7
[AIRFLOW-2652] implement / enhance baseOperator deepcopy Make sure you have checked _all_ steps below. ### JIRA - [x] My PR addresses the following [Airflow JIRA] (https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-2652 - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a JIRA issue. ### Description - [x] Here are some details about my PR, including screenshots of any UI changes: When running ``airflow backfill`` on pythonOperator, it will do / trigger a deepcopy of the task_instance. If some objects can't be deepcopy in certain python version(e.g Protobuf in python 2.7) , an exception will be thrown. We should just do a shallow copy instead of deep copy for the object. The pr here is to copy the ``_deepcopy__`` method in BaseOperator, but skip doing deepcopy for `op_kwargs` and `python_callable`. ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: I can't think of a good way to test. We encounter this in our production. ### Commits - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git- commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [x] Passes `git diff upstream/master -u -- "*.py" | flake8 --diff` Closes #3528 from feng-tao/airflow-2652 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/07db7a3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/07db7a3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/07db7a3d Branch: refs/heads/master Commit: 07db7a3d711500c421f73e57002e87b662a2ab0f Parents: 55b56a4 Author: Tao feng <tf...@lyft.com> Authored: Thu Jun 21 16:28:30 2018 -0700 Committer: Maxime Beauchemin <maximebeauche...@gmail.com> Committed: Thu Jun 21 16:28:30 2018 -0700 ---------------------------------------------------------------------- airflow/models.py | 23 ++++++++++++++--------- airflow/operators/python_operator.py | 4 ++++ tests/operators/python_operator.py | 21 +++++++++++++++++++-- 3 files changed, 37 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07db7a3d/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 8f83586..381e9d3 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2317,6 +2317,15 @@ class BaseOperator(LoggingMixin): ui_color = '#fff' ui_fgcolor = '#000' + # base list which includes all the attrs that don't need deep copy. + _base_operator_shallow_copy_attrs = ('user_defined_macros', + 'user_defined_filters', + 'params', + '_log',) + + # each operator should override this class attr for shallow copy attrs. + shallow_copy_attrs = () + @apply_defaults def __init__( self, @@ -2683,17 +2692,13 @@ class BaseOperator(LoggingMixin): result = cls.__new__(cls) memo[id(self)] = result + shallow_copy = cls.shallow_copy_attrs + cls._base_operator_shallow_copy_attrs + for k, v in list(self.__dict__.items()): - if k not in ('user_defined_macros', 'user_defined_filters', - 'params', '_log'): + if k not in shallow_copy: setattr(result, k, copy.deepcopy(v, memo)) - result.params = self.params - if hasattr(self, 'user_defined_macros'): - result.user_defined_macros = self.user_defined_macros - if hasattr(self, 'user_defined_filters'): - result.user_defined_filters = self.user_defined_filters - if hasattr(self, '_log'): - result._log = self._log + else: + setattr(result, k, copy.copy(v)) return result def __getstate__(self): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07db7a3d/airflow/operators/python_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index 88f3b1a..a564897 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -65,6 +65,10 @@ class PythonOperator(BaseOperator): template_ext = tuple() ui_color = '#ffefeb' + # since we won't mutate the arguments, we should just do the shallow copy + # there are some cases we can't deepcopy the objects(e.g protobuf). + shallow_copy_attrs = ('python_callable', 'op_kwargs',) + @apply_defaults def __init__( self, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07db7a3d/tests/operators/python_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/python_operator.py b/tests/operators/python_operator.py index e44cfa4..43aa8a6 100644 --- a/tests/operators/python_operator.py +++ b/tests/operators/python_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,6 +19,7 @@ from __future__ import print_function, unicode_literals +import copy import datetime import unittest @@ -90,6 +91,22 @@ class PythonOperatorTest(unittest.TestCase): task_id='python_operator', dag=self.dag) + def test_python_operator_shallow_copy_attr(self): + not_callable = lambda x: x + original_task = PythonOperator( + python_callable=not_callable, + task_id='python_operator', + op_kwargs={'certain_attrs': ''}, + dag=self.dag + ) + new_task = copy.deepcopy(original_task) + # shallow copy op_kwargs + self.assertEquals(id(original_task.op_kwargs['certain_attrs']), + id(new_task.op_kwargs['certain_attrs'])) + # shallow copy python_callable + self.assertEquals(id(original_task.python_callable), + id(new_task.python_callable)) + class BranchOperatorTest(unittest.TestCase): def setUp(self):