Repository: incubator-airflow
Updated Branches:
  refs/heads/master 55b56a433 -> 07db7a3d7


[AIRFLOW-2652] implement / enhance baseOperator deepcopy

Make sure you have checked _all_ steps below.

### JIRA
- [x] My PR addresses the following [Airflow JIRA]
(https://issues.apache.org/jira/browse/AIRFLOW/)
issues and references them in the PR title. For
example, "\[AIRFLOW-XXX\] My Airflow PR"
    -
https://issues.apache.org/jira/browse/AIRFLOW-2652
    - In case you are fixing a typo in the
documentation you can prepend your commit with
\[AIRFLOW-XXX\], code changes always need a JIRA
issue.

### Description
- [x] Here are some details about my PR, including
screenshots of any UI changes:
When running ``airflow backfill`` on
pythonOperator, it will do / trigger a deepcopy of
the task_instance. If some objects can't be
deepcopy in certain python version(e.g Protobuf in
python 2.7) , an exception will be thrown. We
should just do a shallow copy instead of deep copy
for the object.

The pr here is to copy the ``_deepcopy__`` method
in BaseOperator, but skip doing deepcopy for
`op_kwargs` and `python_callable`.

### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
I can't think of a good way to test. We encounter
this in our production.

### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not
"adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

### Documentation
- [x] In case of new functionality, my PR adds
documentation that describes how to use it.
    - When adding new operators/hooks/sensors, the
autoclass documentation generation needs to be
added.

### Code Quality
- [x] Passes `git diff upstream/master -u --
"*.py" | flake8 --diff`

Closes #3528 from feng-tao/airflow-2652


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/07db7a3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/07db7a3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/07db7a3d

Branch: refs/heads/master
Commit: 07db7a3d711500c421f73e57002e87b662a2ab0f
Parents: 55b56a4
Author: Tao feng <tf...@lyft.com>
Authored: Thu Jun 21 16:28:30 2018 -0700
Committer: Maxime Beauchemin <maximebeauche...@gmail.com>
Committed: Thu Jun 21 16:28:30 2018 -0700

----------------------------------------------------------------------
 airflow/models.py                    | 23 ++++++++++++++---------
 airflow/operators/python_operator.py |  4 ++++
 tests/operators/python_operator.py   | 21 +++++++++++++++++++--
 3 files changed, 37 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07db7a3d/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 8f83586..381e9d3 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2317,6 +2317,15 @@ class BaseOperator(LoggingMixin):
     ui_color = '#fff'
     ui_fgcolor = '#000'
 
+    # base list which includes all the attrs that don't need deep copy.
+    _base_operator_shallow_copy_attrs = ('user_defined_macros',
+                                         'user_defined_filters',
+                                         'params',
+                                         '_log',)
+
+    # each operator should override this class attr for shallow copy attrs.
+    shallow_copy_attrs = ()
+
     @apply_defaults
     def __init__(
             self,
@@ -2683,17 +2692,13 @@ class BaseOperator(LoggingMixin):
         result = cls.__new__(cls)
         memo[id(self)] = result
 
+        shallow_copy = cls.shallow_copy_attrs + 
cls._base_operator_shallow_copy_attrs
+
         for k, v in list(self.__dict__.items()):
-            if k not in ('user_defined_macros', 'user_defined_filters',
-                         'params', '_log'):
+            if k not in shallow_copy:
                 setattr(result, k, copy.deepcopy(v, memo))
-        result.params = self.params
-        if hasattr(self, 'user_defined_macros'):
-            result.user_defined_macros = self.user_defined_macros
-        if hasattr(self, 'user_defined_filters'):
-            result.user_defined_filters = self.user_defined_filters
-        if hasattr(self, '_log'):
-            result._log = self._log
+            else:
+                setattr(result, k, copy.copy(v))
         return result
 
     def __getstate__(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07db7a3d/airflow/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/python_operator.py 
b/airflow/operators/python_operator.py
index 88f3b1a..a564897 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -65,6 +65,10 @@ class PythonOperator(BaseOperator):
     template_ext = tuple()
     ui_color = '#ffefeb'
 
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects(e.g protobuf).
+    shallow_copy_attrs = ('python_callable', 'op_kwargs',)
+
     @apply_defaults
     def __init__(
             self,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07db7a3d/tests/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/python_operator.py 
b/tests/operators/python_operator.py
index e44cfa4..43aa8a6 100644
--- a/tests/operators/python_operator.py
+++ b/tests/operators/python_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,7 @@
 
 from __future__ import print_function, unicode_literals
 
+import copy
 import datetime
 import unittest
 
@@ -90,6 +91,22 @@ class PythonOperatorTest(unittest.TestCase):
                 task_id='python_operator',
                 dag=self.dag)
 
+    def test_python_operator_shallow_copy_attr(self):
+        not_callable = lambda x: x
+        original_task = PythonOperator(
+            python_callable=not_callable,
+            task_id='python_operator',
+            op_kwargs={'certain_attrs': ''},
+            dag=self.dag
+        )
+        new_task = copy.deepcopy(original_task)
+        # shallow copy op_kwargs
+        self.assertEquals(id(original_task.op_kwargs['certain_attrs']),
+                          id(new_task.op_kwargs['certain_attrs']))
+        # shallow copy python_callable
+        self.assertEquals(id(original_task.python_callable),
+                          id(new_task.python_callable))
+
 
 class BranchOperatorTest(unittest.TestCase):
     def setUp(self):

Reply via email to