This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 26a96fa76a Operator name separate from class (#22834)
26a96fa76a is described below
commit 26a96fa76a41a6abc9f276e8ac32f21bde6ff022
Author: Mark Norman Francis <[email protected]>
AuthorDate: Wed Aug 17 09:23:27 2022 +0100
Operator name separate from class (#22834)
It should be more helpful to display '@task.virtualenv' rather than
'_PythonVirtualenvOperator' when looking at dag run details.
---
airflow/api_connexion/schemas/task_schema.py | 4 ++++
airflow/decorators/base.py | 7 +++++++
airflow/decorators/python.py | 2 ++
airflow/decorators/python_virtualenv.py | 2 ++
airflow/models/abstractoperator.py | 4 ++++
airflow/models/baseoperator.py | 9 +++++++++
airflow/models/mappedoperator.py | 11 +++++++++++
airflow/serialization/serialized_objects.py | 12 ++++++++++++
airflow/www/views.py | 4 ++--
docs/apache-airflow/howto/custom-operator.rst | 2 ++
tests/api_connexion/endpoints/test_task_endpoint.py | 7 +++++++
tests/api_connexion/schemas/test_task_schema.py | 2 ++
tests/models/test_taskinstance.py | 2 ++
tests/serialization/test_dag_serialization.py | 11 +++++++++++
tests/test_utils/mock_operators.py | 1 +
15 files changed, 78 insertions(+), 2 deletions(-)
diff --git a/airflow/api_connexion/schemas/task_schema.py
b/airflow/api_connexion/schemas/task_schema.py
index aa9a470308..4815dffb72 100644
--- a/airflow/api_connexion/schemas/task_schema.py
+++ b/airflow/api_connexion/schemas/task_schema.py
@@ -33,6 +33,7 @@ class TaskSchema(Schema):
"""Task schema"""
class_ref = fields.Method("_get_class_reference", dump_only=True)
+ operator_name = fields.Method("_get_operator_name", dump_only=True)
task_id = fields.String(dump_only=True)
owner = fields.String(dump_only=True)
start_date = fields.DateTime(dump_only=True)
@@ -64,6 +65,9 @@ class TaskSchema(Schema):
result = ClassReferenceSchema().dump(obj)
return result.data if hasattr(result, "data") else result
+ def _get_operator_name(self, obj):
+ return obj.operator_name
+
@staticmethod
def get_params(obj):
"""Get the Params defined in a Task"""
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index cfc94b525e..4e43fa664e 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -391,6 +391,12 @@ class _TaskDecorator(Generic[FParams, FReturn,
OperatorSubclass]):
# Mypy does not work well with a subclassed attrs class :(
_MappedOperator = cast(Any, DecoratedMappedOperator)
+
+ try:
+ operator_name = self.operator_class.custom_operator_name # type:
ignore
+ except AttributeError:
+ operator_name = self.operator_class.__name__
+
operator = _MappedOperator(
operator_class=self.operator_class,
expand_input=EXPAND_INPUT_EMPTY, # Don't use this; mapped values
go to op_kwargs_expand_input.
@@ -407,6 +413,7 @@ class _TaskDecorator(Generic[FParams, FReturn,
OperatorSubclass]):
is_empty=False,
task_module=self.operator_class.__module__,
task_type=self.operator_class.__name__,
+ operator_name=operator_name,
dag=dag,
task_group=task_group,
start_date=start_date,
diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py
index 3f00681ccf..020837930f 100644
--- a/airflow/decorators/python.py
+++ b/airflow/decorators/python.py
@@ -41,6 +41,8 @@ class _PythonDecoratedOperator(DecoratedOperator,
PythonOperator):
# there are some cases we can't deepcopy the objects (e.g protobuf).
shallow_copy_attrs: Sequence[str] = ('python_callable',)
+ custom_operator_name: str = '@task'
+
def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs) ->
None:
kwargs_to_upstream = {
"python_callable": python_callable,
diff --git a/airflow/decorators/python_virtualenv.py
b/airflow/decorators/python_virtualenv.py
index dba306776c..ec954fbc4f 100644
--- a/airflow/decorators/python_virtualenv.py
+++ b/airflow/decorators/python_virtualenv.py
@@ -44,6 +44,8 @@ class _PythonVirtualenvDecoratedOperator(DecoratedOperator,
PythonVirtualenvOper
# there are some cases we can't deepcopy the objects (e.g protobuf).
shallow_copy_attrs: Sequence[str] = ('python_callable',)
+ custom_operator_name: str = '@task.virtualenv'
+
def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs) ->
None:
kwargs_to_upstream = {
"python_callable": python_callable,
diff --git a/airflow/models/abstractoperator.py
b/airflow/models/abstractoperator.py
index bf9a32bfee..b221cad160 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -131,6 +131,10 @@ class AbstractOperator(LoggingMixin, DAGNode):
def task_type(self) -> str:
raise NotImplementedError()
+ @property
+ def operator_name(self) -> str:
+ raise NotImplementedError()
+
@property
def inherits_from_empty_operator(self) -> bool:
raise NotImplementedError()
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 395e1d3b67..edb5af6494 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1354,6 +1354,14 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
"""@property: type of the task"""
return self.__class__.__name__
+ @property
+ def operator_name(self) -> str:
+ """@property: use a more friendly display name for the operator, if
set"""
+ try:
+ return self.custom_operator_name # type: ignore
+ except AttributeError:
+ return self.task_type
+
@property
def roots(self) -> List["BaseOperator"]:
"""Required by DAGNode."""
@@ -1458,6 +1466,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
'start_date',
'end_date',
'_task_type',
+ '_operator_name',
'subdag',
'ui_color',
'ui_fgcolor',
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 2a93e442b7..24c6629550 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -214,6 +214,11 @@ class OperatorPartial:
start_date = partial_kwargs.pop("start_date")
end_date = partial_kwargs.pop("end_date")
+ try:
+ operator_name = self.operator_class.custom_operator_name # type:
ignore
+ except AttributeError:
+ operator_name = self.operator_class.__name__
+
op = MappedOperator(
operator_class=self.operator_class,
expand_input=expand_input,
@@ -230,6 +235,7 @@ class OperatorPartial:
is_empty=issubclass(self.operator_class, EmptyOperator),
task_module=self.operator_class.__module__,
task_type=self.operator_class.__name__,
+ operator_name=operator_name,
dag=dag,
task_group=task_group,
start_date=start_date,
@@ -279,6 +285,7 @@ class MappedOperator(AbstractOperator):
_is_empty: bool
_task_module: str
_task_type: str
+ _operator_name: str
dag: Optional["DAG"]
task_group: Optional["TaskGroup"]
@@ -378,6 +385,10 @@ class MappedOperator(AbstractOperator):
"""Implementing Operator."""
return self._task_type
+ @property
+ def operator_name(self) -> str:
+ return self._operator_name
+
@property
def inherits_from_empty_operator(self) -> bool:
"""Implementing Operator."""
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 82946bb389..6b073692f0 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -640,6 +640,16 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
def task_type(self, task_type: str):
self._task_type = task_type
+ @property
+ def operator_name(self) -> str:
+ # Overwrites operator_name of BaseOperator to use _operator_name
instead of
+ # __class__.operator_name.
+ return self._operator_name
+
+ @operator_name.setter
+ def operator_name(self, operator_name: str):
+ self._operator_name = operator_name
+
@classmethod
def serialize_mapped_operator(cls, op: MappedOperator) -> Dict[str, Any]:
serialized_op = cls._serialize_node(op, include_deps=op.deps !=
MappedOperator.deps_for(BaseOperator))
@@ -674,6 +684,7 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
serialize_op = cls.serialize_to_json(op, cls._decorated_fields)
serialize_op['_task_type'] = getattr(op, "_task_type",
type(op).__name__)
serialize_op['_task_module'] = getattr(op, "_task_module",
type(op).__module__)
+ serialize_op['_operator_name'] = op.operator_name
# Used to determine if an Operator is inherited from EmptyOperator
serialize_op['_is_empty'] = op.inherits_from_empty_operator
@@ -846,6 +857,7 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
is_empty=False,
task_module=encoded_op["_task_module"],
task_type=encoded_op["_task_type"],
+ operator_name=encoded_op["_operator_name"],
dag=None,
task_group=None,
start_date=None,
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d4207d2743..c47c7937f0 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -364,7 +364,7 @@ def dag_to_grid(dag, dag_runs, session):
'extra_links': item.extra_links,
'is_mapped': item.is_mapped,
'has_outlet_datasets': any(isinstance(i, Dataset) for i in
getattr(item, "_outlets", [])),
- 'operator': item.task_type,
+ 'operator': item.operator_name,
}
# Task Group
@@ -3467,7 +3467,7 @@ class Airflow(AirflowBaseView):
task_dict['end_date'] = end_date
task_dict['start_date'] = task_dict['start_date'] or end_date
task_dict['state'] = State.FAILED
- task_dict['operator'] = task.task_type
+ task_dict['operator'] = task.operator_name
task_dict['try_number'] = try_count
task_dict['extraLinks'] = task.extra_links
task_dict['execution_date'] = dttm.isoformat()
diff --git a/docs/apache-airflow/howto/custom-operator.rst
b/docs/apache-airflow/howto/custom-operator.rst
index 1fe8b3e2da..3f0096c621 100644
--- a/docs/apache-airflow/howto/custom-operator.rst
+++ b/docs/apache-airflow/howto/custom-operator.rst
@@ -135,12 +135,14 @@ User interface
Airflow also allows the developer to control how the operator shows up in the
DAG UI.
Override ``ui_color`` to change the background color of the operator in UI.
Override ``ui_fgcolor`` to change the color of the label.
+Override ``custom_operator_name`` to change the displayed name to something
other than the classname.
.. code-block:: python
class HelloOperator(BaseOperator):
ui_color = "#ff0000"
ui_fgcolor = "#000000"
+ custom_operator_name = "Howdy"
# ...
Templating
diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py
b/tests/api_connexion/endpoints/test_task_endpoint.py
index 4151a4b60f..e1fe922a2a 100644
--- a/tests/api_connexion/endpoints/test_task_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_endpoint.py
@@ -106,6 +106,7 @@ class TestGetTask(TestTaskEndpoint):
"end_date": None,
"execution_timeout": None,
"extra_links": [],
+ "operator_name": "EmptyOperator",
"owner": "airflow",
'params': {
'foo': {
@@ -147,6 +148,7 @@ class TestGetTask(TestTaskEndpoint):
"execution_timeout": None,
"extra_links": [],
"is_mapped": True,
+ "operator_name": "EmptyOperator",
"owner": "airflow",
"params": {},
"pool": "default_pool",
@@ -191,6 +193,7 @@ class TestGetTask(TestTaskEndpoint):
"end_date": None,
"execution_timeout": None,
"extra_links": [],
+ "operator_name": "EmptyOperator",
"owner": "airflow",
'params': {
'foo': {
@@ -257,6 +260,7 @@ class TestGetTasks(TestTaskEndpoint):
"end_date": None,
"execution_timeout": None,
"extra_links": [],
+ "operator_name": "EmptyOperator",
"owner": "airflow",
'params': {
'foo': {
@@ -293,6 +297,7 @@ class TestGetTasks(TestTaskEndpoint):
"end_date": None,
"execution_timeout": None,
"extra_links": [],
+ "operator_name": "EmptyOperator",
"owner": "airflow",
"params": {},
"pool": "default_pool",
@@ -332,6 +337,7 @@ class TestGetTasks(TestTaskEndpoint):
"execution_timeout": None,
"extra_links": [],
"is_mapped": True,
+ "operator_name": "EmptyOperator",
"owner": "airflow",
"params": {},
"pool": "default_pool",
@@ -360,6 +366,7 @@ class TestGetTasks(TestTaskEndpoint):
"end_date": None,
"execution_timeout": None,
"extra_links": [],
+ "operator_name": "EmptyOperator",
"owner": "airflow",
"params": {},
"pool": "default_pool",
diff --git a/tests/api_connexion/schemas/test_task_schema.py
b/tests/api_connexion/schemas/test_task_schema.py
index 07f4b592da..97a46f2be5 100644
--- a/tests/api_connexion/schemas/test_task_schema.py
+++ b/tests/api_connexion/schemas/test_task_schema.py
@@ -40,6 +40,7 @@ class TestTaskSchema:
"execution_timeout": None,
"extra_links": [],
"owner": "airflow",
+ "operator_name": "EmptyOperator",
"params": {},
"pool": "default_pool",
"pool_slots": 1.0,
@@ -78,6 +79,7 @@ class TestTaskCollectionSchema:
"end_date": None,
"execution_timeout": None,
"extra_links": [],
+ "operator_name": "EmptyOperator",
"owner": "airflow",
'params': {
'foo': {
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 0ec52c4a05..a8b8aacad0 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2611,6 +2611,7 @@ class TestTaskInstance:
ti = create_task_instance()
assert ti.task.task_type == 'EmptyOperator'
+ assert ti.task.operator_name == 'EmptyOperator'
# Verify that ti.operator field renders correctly "without"
Serialization
assert ti.operator == "EmptyOperator"
@@ -2621,6 +2622,7 @@ class TestTaskInstance:
# Verify that ti.operator field renders correctly "with" Serialization
ser_ti = TI(task=deserialized_op, run_id=None)
assert ser_ti.operator == "EmptyOperator"
+ assert ser_ti.task.operator_name == 'EmptyOperator'
@pytest.mark.parametrize("pool_override", [None, "test_pool2"])
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index 9cfc8e1bed..17e4ef5764 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -153,6 +153,7 @@ serialized_simple_dag_ground_truth = {
"template_fields_renderers": {'bash_command': 'bash', 'env':
'json'},
"bash_command": "echo {{ task.task_id }}",
"_task_type": "BashOperator",
+ "_operator_name": "BashOperator",
"_task_module": "airflow.operators.bash",
"pool": "default_pool",
"executor_config": {
@@ -183,6 +184,7 @@ serialized_simple_dag_ground_truth = {
"template_fields": ['bash_command'],
"template_fields_renderers": {},
"_task_type": "CustomOperator",
+ "_operator_name": "@custom",
"_task_module": "tests.test_utils.mock_operators",
"pool": "default_pool",
},
@@ -537,6 +539,7 @@ class TestStringifiedDAGs:
fields_to_check = task.get_serialized_fields() - {
# Checked separately
'_task_type',
+ '_operator_name',
'subdag',
# Type is excluded, so don't check it
'_log',
@@ -1850,6 +1853,7 @@ def test_operator_expand_serde():
'_is_mapped': True,
'_task_module': 'airflow.operators.bash',
'_task_type': 'BashOperator',
+ '_operator_name': 'BashOperator',
'downstream_task_ids': [],
'expand_input': {
"type": "dict-of-lists",
@@ -1881,6 +1885,7 @@ def test_operator_expand_serde():
assert op.operator_class == {
'_task_type': 'BashOperator',
+ '_operator_name': 'BashOperator',
'downstream_task_ids': [],
'task_id': 'a',
'template_ext': ['.sh', '.bash'],
@@ -1907,6 +1912,7 @@ def test_operator_expand_xcomarg_serde():
'_is_mapped': True,
'_task_module': 'tests.test_utils.mock_operators',
'_task_type': 'MockOperator',
+ '_operator_name': 'MockOperator',
'downstream_task_ids': [],
'expand_input': {
"type": "dict-of-lists",
@@ -1956,6 +1962,7 @@ def test_operator_expand_kwargs_serde(strict):
'_is_mapped': True,
'_task_module': 'tests.test_utils.mock_operators',
'_task_type': 'MockOperator',
+ '_operator_name': 'MockOperator',
'downstream_task_ids': [],
'expand_input': {
"type": "list-of-dicts",
@@ -2041,6 +2048,7 @@ def test_taskflow_expand_serde():
def x(arg1, arg2, arg3):
print(arg1, arg2, arg3)
+ print('**', type(x), type(x.partial), type(x.expand))
x.partial(arg1=[1, 2, {"a": "b"}]).expand(arg2={"a": 1, "b": 2},
arg3=XComArg(op1))
original = dag.get_task("x")
@@ -2051,6 +2059,7 @@ def test_taskflow_expand_serde():
'_is_mapped': True,
'_task_module': 'airflow.decorators.python',
'_task_type': '_PythonDecoratedOperator',
+ '_operator_name': '@task',
'downstream_task_ids': [],
'partial_kwargs': {
'op_args': [],
@@ -2136,6 +2145,7 @@ def test_taskflow_expand_kwargs_serde(strict):
'_is_mapped': True,
'_task_module': 'airflow.decorators.python',
'_task_type': '_PythonDecoratedOperator',
+ '_operator_name': '@task',
'downstream_task_ids': [],
'partial_kwargs': {
'op_args': [],
@@ -2225,6 +2235,7 @@ def test_dummy_operator_serde(is_inherit):
'_is_empty': is_inherit,
'_task_module': 'tests.serialization.test_dag_serialization',
'_task_type': 'MyDummyOperator',
+ '_operator_name': 'MyDummyOperator',
'_outlets': [],
'_inlets': [],
'downstream_task_ids': [],
diff --git a/tests/test_utils/mock_operators.py
b/tests/test_utils/mock_operators.py
index c21393fd62..d9b5970b2e 100644
--- a/tests/test_utils/mock_operators.py
+++ b/tests/test_utils/mock_operators.py
@@ -100,6 +100,7 @@ class CustomOpLink(BaseOperatorLink):
class CustomOperator(BaseOperator):
template_fields = ['bash_command']
+ custom_operator_name = '@custom'
@property
def operator_extra_links(self):