This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e403c74524 Fix import in `get_custom_facets`. (#34122)
e403c74524 is described below
commit e403c74524a980030ba120c3602de0c3dc867d86
Author: Jakub Dardzinski <[email protected]>
AuthorDate: Wed Sep 6 10:41:52 2023 +0200
Fix import in `get_custom_facets`. (#34122)
Signed-off-by: Jakub Dardzinski <[email protected]>
---
airflow/providers/openlineage/plugins/facets.py | 2 +-
tests/providers/openlineage/utils/test_utils.py | 47 +++++++++++++++++++++++++
2 files changed, 48 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/openlineage/plugins/facets.py
b/airflow/providers/openlineage/plugins/facets.py
index 2c301856f7..b50db6c267 100644
--- a/airflow/providers/openlineage/plugins/facets.py
+++ b/airflow/providers/openlineage/plugins/facets.py
@@ -33,7 +33,7 @@ class AirflowMappedTaskRunFacet(BaseFacet):
@classmethod
def from_task_instance(cls, task_instance):
task = task_instance.task
- from airflow.providers.openlineage.utils import get_operator_class
+ from airflow.providers.openlineage.utils.utils import
get_operator_class
return cls(
mapIndex=task_instance.map_index,
diff --git a/tests/providers/openlineage/utils/test_utils.py
b/tests/providers/openlineage/utils/test_utils.py
new file mode 100644
index 0000000000..a2093652f9
--- /dev/null
+++ b/tests/providers/openlineage/utils/test_utils.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.decorators import task_group
+from airflow.models.taskinstance import TaskInstance as TI
+from airflow.operators.empty import EmptyOperator
+from airflow.providers.openlineage.plugins.facets import
AirflowMappedTaskRunFacet
+from airflow.providers.openlineage.utils.utils import get_custom_facets
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+
+def test_get_custom_facets(dag_maker):
+ with dag_maker(dag_id="dag_test_get_custom_facets") as dag:
+
+ @task_group
+ def task_group_op(k):
+ EmptyOperator(task_id="empty_operator")
+
+ task_group_op.expand(k=[0])
+
+ dag_maker.create_dagrun()
+ ti_0 = TI(dag.get_task("task_group_op.empty_operator"),
execution_date=DEFAULT_DATE, map_index=0)
+
+ assert ti_0.map_index == 0
+
+ assert get_custom_facets(ti_0)["airflow_mappedTask"] ==
AirflowMappedTaskRunFacet(
+ mapIndex=0,
+
operatorClass=f"{ti_0.task.operator_class.__module__}.{ti_0.task.operator_class.__name__}",
+ )