This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 26a96fa76a Operator name separate from class (#22834)
26a96fa76a is described below

commit 26a96fa76a41a6abc9f276e8ac32f21bde6ff022
Author: Mark Norman Francis <[email protected]>
AuthorDate: Wed Aug 17 09:23:27 2022 +0100

    Operator name separate from class (#22834)
    
    It should be more helpful to display '@task.virtualenv' rather than
    '_PythonVirtualenvOperator' when looking at dag run details.
---
 airflow/api_connexion/schemas/task_schema.py        |  4 ++++
 airflow/decorators/base.py                          |  7 +++++++
 airflow/decorators/python.py                        |  2 ++
 airflow/decorators/python_virtualenv.py             |  2 ++
 airflow/models/abstractoperator.py                  |  4 ++++
 airflow/models/baseoperator.py                      |  9 +++++++++
 airflow/models/mappedoperator.py                    | 11 +++++++++++
 airflow/serialization/serialized_objects.py         | 12 ++++++++++++
 airflow/www/views.py                                |  4 ++--
 docs/apache-airflow/howto/custom-operator.rst       |  2 ++
 tests/api_connexion/endpoints/test_task_endpoint.py |  7 +++++++
 tests/api_connexion/schemas/test_task_schema.py     |  2 ++
 tests/models/test_taskinstance.py                   |  2 ++
 tests/serialization/test_dag_serialization.py       | 11 +++++++++++
 tests/test_utils/mock_operators.py                  |  1 +
 15 files changed, 78 insertions(+), 2 deletions(-)

diff --git a/airflow/api_connexion/schemas/task_schema.py 
b/airflow/api_connexion/schemas/task_schema.py
index aa9a470308..4815dffb72 100644
--- a/airflow/api_connexion/schemas/task_schema.py
+++ b/airflow/api_connexion/schemas/task_schema.py
@@ -33,6 +33,7 @@ class TaskSchema(Schema):
     """Task schema"""
 
     class_ref = fields.Method("_get_class_reference", dump_only=True)
+    operator_name = fields.Method("_get_operator_name", dump_only=True)
     task_id = fields.String(dump_only=True)
     owner = fields.String(dump_only=True)
     start_date = fields.DateTime(dump_only=True)
@@ -64,6 +65,9 @@ class TaskSchema(Schema):
         result = ClassReferenceSchema().dump(obj)
         return result.data if hasattr(result, "data") else result
 
+    def _get_operator_name(self, obj):
+        return obj.operator_name
+
     @staticmethod
     def get_params(obj):
         """Get the Params defined in a Task"""
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index cfc94b525e..4e43fa664e 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -391,6 +391,12 @@ class _TaskDecorator(Generic[FParams, FReturn, 
OperatorSubclass]):
 
         # Mypy does not work well with a subclassed attrs class :(
         _MappedOperator = cast(Any, DecoratedMappedOperator)
+
+        try:
+            operator_name = self.operator_class.custom_operator_name  # type: 
ignore
+        except AttributeError:
+            operator_name = self.operator_class.__name__
+
         operator = _MappedOperator(
             operator_class=self.operator_class,
             expand_input=EXPAND_INPUT_EMPTY,  # Don't use this; mapped values 
go to op_kwargs_expand_input.
@@ -407,6 +413,7 @@ class _TaskDecorator(Generic[FParams, FReturn, 
OperatorSubclass]):
             is_empty=False,
             task_module=self.operator_class.__module__,
             task_type=self.operator_class.__name__,
+            operator_name=operator_name,
             dag=dag,
             task_group=task_group,
             start_date=start_date,
diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py
index 3f00681ccf..020837930f 100644
--- a/airflow/decorators/python.py
+++ b/airflow/decorators/python.py
@@ -41,6 +41,8 @@ class _PythonDecoratedOperator(DecoratedOperator, 
PythonOperator):
     # there are some cases we can't deepcopy the objects (e.g protobuf).
     shallow_copy_attrs: Sequence[str] = ('python_callable',)
 
+    custom_operator_name: str = '@task'
+
     def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs) -> 
None:
         kwargs_to_upstream = {
             "python_callable": python_callable,
diff --git a/airflow/decorators/python_virtualenv.py 
b/airflow/decorators/python_virtualenv.py
index dba306776c..ec954fbc4f 100644
--- a/airflow/decorators/python_virtualenv.py
+++ b/airflow/decorators/python_virtualenv.py
@@ -44,6 +44,8 @@ class _PythonVirtualenvDecoratedOperator(DecoratedOperator, 
PythonVirtualenvOper
     # there are some cases we can't deepcopy the objects (e.g protobuf).
     shallow_copy_attrs: Sequence[str] = ('python_callable',)
 
+    custom_operator_name: str = '@task.virtualenv'
+
     def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs) -> 
None:
         kwargs_to_upstream = {
             "python_callable": python_callable,
diff --git a/airflow/models/abstractoperator.py 
b/airflow/models/abstractoperator.py
index bf9a32bfee..b221cad160 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -131,6 +131,10 @@ class AbstractOperator(LoggingMixin, DAGNode):
     def task_type(self) -> str:
         raise NotImplementedError()
 
+    @property
+    def operator_name(self) -> str:
+        raise NotImplementedError()
+
     @property
     def inherits_from_empty_operator(self) -> bool:
         raise NotImplementedError()
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 395e1d3b67..edb5af6494 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1354,6 +1354,14 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         """@property: type of the task"""
         return self.__class__.__name__
 
+    @property
+    def operator_name(self) -> str:
+        """@property: use a more friendly display name for the operator, if 
set"""
+        try:
+            return self.custom_operator_name  # type: ignore
+        except AttributeError:
+            return self.task_type
+
     @property
     def roots(self) -> List["BaseOperator"]:
         """Required by DAGNode."""
@@ -1458,6 +1466,7 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
                     'start_date',
                     'end_date',
                     '_task_type',
+                    '_operator_name',
                     'subdag',
                     'ui_color',
                     'ui_fgcolor',
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 2a93e442b7..24c6629550 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -214,6 +214,11 @@ class OperatorPartial:
         start_date = partial_kwargs.pop("start_date")
         end_date = partial_kwargs.pop("end_date")
 
+        try:
+            operator_name = self.operator_class.custom_operator_name  # type: 
ignore
+        except AttributeError:
+            operator_name = self.operator_class.__name__
+
         op = MappedOperator(
             operator_class=self.operator_class,
             expand_input=expand_input,
@@ -230,6 +235,7 @@ class OperatorPartial:
             is_empty=issubclass(self.operator_class, EmptyOperator),
             task_module=self.operator_class.__module__,
             task_type=self.operator_class.__name__,
+            operator_name=operator_name,
             dag=dag,
             task_group=task_group,
             start_date=start_date,
@@ -279,6 +285,7 @@ class MappedOperator(AbstractOperator):
     _is_empty: bool
     _task_module: str
     _task_type: str
+    _operator_name: str
 
     dag: Optional["DAG"]
     task_group: Optional["TaskGroup"]
@@ -378,6 +385,10 @@ class MappedOperator(AbstractOperator):
         """Implementing Operator."""
         return self._task_type
 
+    @property
+    def operator_name(self) -> str:
+        return self._operator_name
+
     @property
     def inherits_from_empty_operator(self) -> bool:
         """Implementing Operator."""
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 82946bb389..6b073692f0 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -640,6 +640,16 @@ class SerializedBaseOperator(BaseOperator, 
BaseSerialization):
     def task_type(self, task_type: str):
         self._task_type = task_type
 
+    @property
+    def operator_name(self) -> str:
+        # Overwrites operator_name of BaseOperator to use _operator_name 
instead of
+        # __class__.operator_name.
+        return self._operator_name
+
+    @operator_name.setter
+    def operator_name(self, operator_name: str):
+        self._operator_name = operator_name
+
     @classmethod
     def serialize_mapped_operator(cls, op: MappedOperator) -> Dict[str, Any]:
         serialized_op = cls._serialize_node(op, include_deps=op.deps != 
MappedOperator.deps_for(BaseOperator))
@@ -674,6 +684,7 @@ class SerializedBaseOperator(BaseOperator, 
BaseSerialization):
         serialize_op = cls.serialize_to_json(op, cls._decorated_fields)
         serialize_op['_task_type'] = getattr(op, "_task_type", 
type(op).__name__)
         serialize_op['_task_module'] = getattr(op, "_task_module", 
type(op).__module__)
+        serialize_op['_operator_name'] = op.operator_name
 
         # Used to determine if an Operator is inherited from EmptyOperator
         serialize_op['_is_empty'] = op.inherits_from_empty_operator
@@ -846,6 +857,7 @@ class SerializedBaseOperator(BaseOperator, 
BaseSerialization):
                 is_empty=False,
                 task_module=encoded_op["_task_module"],
                 task_type=encoded_op["_task_type"],
+                operator_name=encoded_op["_operator_name"],
                 dag=None,
                 task_group=None,
                 start_date=None,
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d4207d2743..c47c7937f0 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -364,7 +364,7 @@ def dag_to_grid(dag, dag_runs, session):
                 'extra_links': item.extra_links,
                 'is_mapped': item.is_mapped,
                 'has_outlet_datasets': any(isinstance(i, Dataset) for i in 
getattr(item, "_outlets", [])),
-                'operator': item.task_type,
+                'operator': item.operator_name,
             }
 
         # Task Group
@@ -3467,7 +3467,7 @@ class Airflow(AirflowBaseView):
             task_dict['end_date'] = end_date
             task_dict['start_date'] = task_dict['start_date'] or end_date
             task_dict['state'] = State.FAILED
-            task_dict['operator'] = task.task_type
+            task_dict['operator'] = task.operator_name
             task_dict['try_number'] = try_count
             task_dict['extraLinks'] = task.extra_links
             task_dict['execution_date'] = dttm.isoformat()
diff --git a/docs/apache-airflow/howto/custom-operator.rst 
b/docs/apache-airflow/howto/custom-operator.rst
index 1fe8b3e2da..3f0096c621 100644
--- a/docs/apache-airflow/howto/custom-operator.rst
+++ b/docs/apache-airflow/howto/custom-operator.rst
@@ -135,12 +135,14 @@ User interface
 Airflow also allows the developer to control how the operator shows up in the 
DAG UI.
 Override ``ui_color`` to change the background color of the operator in UI.
 Override ``ui_fgcolor`` to change the color of the label.
+Override ``custom_operator_name`` to change the displayed name to something 
other than the classname.
 
 .. code-block:: python
 
         class HelloOperator(BaseOperator):
             ui_color = "#ff0000"
             ui_fgcolor = "#000000"
+            custom_operator_name = "Howdy"
             # ...
 
 Templating
diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py 
b/tests/api_connexion/endpoints/test_task_endpoint.py
index 4151a4b60f..e1fe922a2a 100644
--- a/tests/api_connexion/endpoints/test_task_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_endpoint.py
@@ -106,6 +106,7 @@ class TestGetTask(TestTaskEndpoint):
             "end_date": None,
             "execution_timeout": None,
             "extra_links": [],
+            "operator_name": "EmptyOperator",
             "owner": "airflow",
             'params': {
                 'foo': {
@@ -147,6 +148,7 @@ class TestGetTask(TestTaskEndpoint):
             "execution_timeout": None,
             "extra_links": [],
             "is_mapped": True,
+            "operator_name": "EmptyOperator",
             "owner": "airflow",
             "params": {},
             "pool": "default_pool",
@@ -191,6 +193,7 @@ class TestGetTask(TestTaskEndpoint):
             "end_date": None,
             "execution_timeout": None,
             "extra_links": [],
+            "operator_name": "EmptyOperator",
             "owner": "airflow",
             'params': {
                 'foo': {
@@ -257,6 +260,7 @@ class TestGetTasks(TestTaskEndpoint):
                     "end_date": None,
                     "execution_timeout": None,
                     "extra_links": [],
+                    "operator_name": "EmptyOperator",
                     "owner": "airflow",
                     'params': {
                         'foo': {
@@ -293,6 +297,7 @@ class TestGetTasks(TestTaskEndpoint):
                     "end_date": None,
                     "execution_timeout": None,
                     "extra_links": [],
+                    "operator_name": "EmptyOperator",
                     "owner": "airflow",
                     "params": {},
                     "pool": "default_pool",
@@ -332,6 +337,7 @@ class TestGetTasks(TestTaskEndpoint):
                     "execution_timeout": None,
                     "extra_links": [],
                     "is_mapped": True,
+                    "operator_name": "EmptyOperator",
                     "owner": "airflow",
                     "params": {},
                     "pool": "default_pool",
@@ -360,6 +366,7 @@ class TestGetTasks(TestTaskEndpoint):
                     "end_date": None,
                     "execution_timeout": None,
                     "extra_links": [],
+                    "operator_name": "EmptyOperator",
                     "owner": "airflow",
                     "params": {},
                     "pool": "default_pool",
diff --git a/tests/api_connexion/schemas/test_task_schema.py 
b/tests/api_connexion/schemas/test_task_schema.py
index 07f4b592da..97a46f2be5 100644
--- a/tests/api_connexion/schemas/test_task_schema.py
+++ b/tests/api_connexion/schemas/test_task_schema.py
@@ -40,6 +40,7 @@ class TestTaskSchema:
             "execution_timeout": None,
             "extra_links": [],
             "owner": "airflow",
+            "operator_name": "EmptyOperator",
             "params": {},
             "pool": "default_pool",
             "pool_slots": 1.0,
@@ -78,6 +79,7 @@ class TestTaskCollectionSchema:
                     "end_date": None,
                     "execution_timeout": None,
                     "extra_links": [],
+                    "operator_name": "EmptyOperator",
                     "owner": "airflow",
                     'params': {
                         'foo': {
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 0ec52c4a05..a8b8aacad0 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2611,6 +2611,7 @@ class TestTaskInstance:
 
         ti = create_task_instance()
         assert ti.task.task_type == 'EmptyOperator'
+        assert ti.task.operator_name == 'EmptyOperator'
 
         # Verify that ti.operator field renders correctly "without" 
Serialization
         assert ti.operator == "EmptyOperator"
@@ -2621,6 +2622,7 @@ class TestTaskInstance:
         # Verify that ti.operator field renders correctly "with" Serialization
         ser_ti = TI(task=deserialized_op, run_id=None)
         assert ser_ti.operator == "EmptyOperator"
+        assert ser_ti.task.operator_name == 'EmptyOperator'
 
 
 @pytest.mark.parametrize("pool_override", [None, "test_pool2"])
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index 9cfc8e1bed..17e4ef5764 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -153,6 +153,7 @@ serialized_simple_dag_ground_truth = {
                 "template_fields_renderers": {'bash_command': 'bash', 'env': 
'json'},
                 "bash_command": "echo {{ task.task_id }}",
                 "_task_type": "BashOperator",
+                "_operator_name": "BashOperator",
                 "_task_module": "airflow.operators.bash",
                 "pool": "default_pool",
                 "executor_config": {
@@ -183,6 +184,7 @@ serialized_simple_dag_ground_truth = {
                 "template_fields": ['bash_command'],
                 "template_fields_renderers": {},
                 "_task_type": "CustomOperator",
+                "_operator_name": "@custom",
                 "_task_module": "tests.test_utils.mock_operators",
                 "pool": "default_pool",
             },
@@ -537,6 +539,7 @@ class TestStringifiedDAGs:
         fields_to_check = task.get_serialized_fields() - {
             # Checked separately
             '_task_type',
+            '_operator_name',
             'subdag',
             # Type is excluded, so don't check it
             '_log',
@@ -1850,6 +1853,7 @@ def test_operator_expand_serde():
         '_is_mapped': True,
         '_task_module': 'airflow.operators.bash',
         '_task_type': 'BashOperator',
+        '_operator_name': 'BashOperator',
         'downstream_task_ids': [],
         'expand_input': {
             "type": "dict-of-lists",
@@ -1881,6 +1885,7 @@ def test_operator_expand_serde():
 
     assert op.operator_class == {
         '_task_type': 'BashOperator',
+        '_operator_name': 'BashOperator',
         'downstream_task_ids': [],
         'task_id': 'a',
         'template_ext': ['.sh', '.bash'],
@@ -1907,6 +1912,7 @@ def test_operator_expand_xcomarg_serde():
         '_is_mapped': True,
         '_task_module': 'tests.test_utils.mock_operators',
         '_task_type': 'MockOperator',
+        '_operator_name': 'MockOperator',
         'downstream_task_ids': [],
         'expand_input': {
             "type": "dict-of-lists",
@@ -1956,6 +1962,7 @@ def test_operator_expand_kwargs_serde(strict):
         '_is_mapped': True,
         '_task_module': 'tests.test_utils.mock_operators',
         '_task_type': 'MockOperator',
+        '_operator_name': 'MockOperator',
         'downstream_task_ids': [],
         'expand_input': {
             "type": "list-of-dicts",
@@ -2041,6 +2048,7 @@ def test_taskflow_expand_serde():
         def x(arg1, arg2, arg3):
             print(arg1, arg2, arg3)
 
+        print('**', type(x), type(x.partial), type(x.expand))
         x.partial(arg1=[1, 2, {"a": "b"}]).expand(arg2={"a": 1, "b": 2}, 
arg3=XComArg(op1))
 
     original = dag.get_task("x")
@@ -2051,6 +2059,7 @@ def test_taskflow_expand_serde():
         '_is_mapped': True,
         '_task_module': 'airflow.decorators.python',
         '_task_type': '_PythonDecoratedOperator',
+        '_operator_name': '@task',
         'downstream_task_ids': [],
         'partial_kwargs': {
             'op_args': [],
@@ -2136,6 +2145,7 @@ def test_taskflow_expand_kwargs_serde(strict):
         '_is_mapped': True,
         '_task_module': 'airflow.decorators.python',
         '_task_type': '_PythonDecoratedOperator',
+        '_operator_name': '@task',
         'downstream_task_ids': [],
         'partial_kwargs': {
             'op_args': [],
@@ -2225,6 +2235,7 @@ def test_dummy_operator_serde(is_inherit):
         '_is_empty': is_inherit,
         '_task_module': 'tests.serialization.test_dag_serialization',
         '_task_type': 'MyDummyOperator',
+        '_operator_name': 'MyDummyOperator',
         '_outlets': [],
         '_inlets': [],
         'downstream_task_ids': [],
diff --git a/tests/test_utils/mock_operators.py 
b/tests/test_utils/mock_operators.py
index c21393fd62..d9b5970b2e 100644
--- a/tests/test_utils/mock_operators.py
+++ b/tests/test_utils/mock_operators.py
@@ -100,6 +100,7 @@ class CustomOpLink(BaseOperatorLink):
 class CustomOperator(BaseOperator):
 
     template_fields = ['bash_command']
+    custom_operator_name = '@custom'
 
     @property
     def operator_extra_links(self):

Reply via email to