This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch custom-operator-metaclass-rule
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ef4626ad68fd44c7cb0bf8fb7a3e5ff64c42d41c
Author: Daniel Imberman <daniel.imber...@gmail.com>
AuthorDate: Wed Nov 25 11:58:35 2020 -0800

    Add BaseOperatorMetaclassRule
    
    Adds an upgrade check rule that ensures that users are not using custom
    metaclasses in their custom operators
---
 .../rules/custom_operator_metaclass_rule.py        | 58 ++++++++++++++++++++++
 .../rules/test_custom_operator_metaclass_rule.py   | 55 ++++++++++++++++++++
 2 files changed, 113 insertions(+)

diff --git a/airflow/upgrade/rules/custom_operator_metaclass_rule.py 
b/airflow/upgrade/rules/custom_operator_metaclass_rule.py
new file mode 100644
index 0000000..b4061e0
--- /dev/null
+++ b/airflow/upgrade/rules/custom_operator_metaclass_rule.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+from airflow.models.dagbag import DagBag
+from airflow.upgrade.rules.base_rule import BaseRule
+from airflow.utils.db import provide_session
+
+
+def check_task_for_metaclasses(task):
+    class_type = type(task.__class__)
+    if class_type != type:
+        res = (
+            "Class {class_name} contained invalid custom metaclass "
+            "{metaclass_name}. Custom metaclasses for operators are not "
+            "allowed in Airflow 2.0. Please remove this custom 
metaclass.".format(
+                class_name=task.__class__, metaclass_name=class_type
+            )
+        )
+        return res
+    else:
+        return None
+
+
+class BaseOperatorMetaclassRule(BaseRule):
+    title = "Ensure users are not using custom metaclasses in custom operators"
+
+    description = """\
+In Airflow 2.0, we require that all custom operators use the BaseOperatorMeta 
metaclass.\
+To ensure this, we can no longer allow custom metaclasses in custom operators.
+    """
+
+    @provide_session
+    def check(self, session=None):
+        dagbag = DagBag(include_examples=True)
+        custom_metaclasses = []
+        for dag_id, dag in dagbag.dags.items():
+            for task in dag.tasks:
+                res = check_task_for_metaclasses(task)
+                if res:
+                    custom_metaclasses.append(res)
+
+        return custom_metaclasses
diff --git a/tests/upgrade/rules/test_custom_operator_metaclass_rule.py 
b/tests/upgrade/rules/test_custom_operator_metaclass_rule.py
new file mode 100644
index 0000000..510614c
--- /dev/null
+++ b/tests/upgrade/rules/test_custom_operator_metaclass_rule.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from unittest import TestCase
+
+from airflow.models.baseoperator import BaseOperator
+from airflow.upgrade.rules.custom_operator_metaclass_rule import (
+    BaseOperatorMetaclassRule,
+    check_task_for_metaclasses,
+)
+from six import with_metaclass
+
+
+class MyMeta(type):
+    pass
+
+
+class MyMetaOperator(with_metaclass(MyMeta, BaseOperator)):
+    def execute(self, context):
+        pass
+
+
+class TestBaseOperatorMetaclassRule(TestCase):
+    def test_individual_task(self):
+        task = MyMetaOperator(task_id="foo")
+        res = check_task_for_metaclasses(task)
+        expected_error = (
+            "Class <class 
'tests.upgrade.rules.test_custom_operator_metaclass_rule.MyMetaOperator'> "
+            "contained invalid custom metaclass <class "
+            
"'tests.upgrade.rules.test_custom_operator_metaclass_rule.MyMeta'>. "
+            "Custom metaclasses for operators are not allowed in Airflow 2.0. "
+            "Please remove this custom metaclass."
+        )
+        self.assertEqual(expected_error, res)
+
+    def test_check(self):
+        rule = BaseOperatorMetaclassRule()
+
+        assert isinstance(rule.description, str)
+        assert isinstance(rule.title, str)
+        msgs = rule.check()
+        self.assertEqual(msgs, [])

Reply via email to