This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new b1ee7244f6 Split AWS Links tests into separate modules (#35484) b1ee7244f6 is described below commit b1ee7244f68ed9a7d855c56f2ed8b5cbaa28c834 Author: Andrey Anshin <andrey.ans...@taragol.is> AuthorDate: Tue Nov 7 17:46:42 2023 +0400 Split AWS Links tests into separate modules (#35484) * Split AWS Links tests into separate modules * Add Glue Link(s) tests --- tests/always/test_project_structure.py | 5 - tests/providers/amazon/aws/links/conftest.py | 50 ----- tests/providers/amazon/aws/links/test_base.py | 84 -------- tests/providers/amazon/aws/links/test_base_aws.py | 215 +++++++++++++++++++++ tests/providers/amazon/aws/links/test_batch.py | 65 +++++++ tests/providers/amazon/aws/links/test_emr.py | 77 ++++++++ .../links_test_utils.py => links/test_glue.py} | 20 +- tests/providers/amazon/aws/links/test_links.py | 167 ---------------- tests/providers/amazon/aws/links/test_logs.py | 38 ++++ 9 files changed, 410 insertions(+), 311 deletions(-) diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index b2e5dc1c9d..9976a9950d 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -65,11 +65,6 @@ class TestProjectStructure: "tests/providers/amazon/aws/executors/ecs/test_utils.py", "tests/providers/amazon/aws/fs/test_s3.py", "tests/providers/amazon/aws/hooks/test_dms.py", - "tests/providers/amazon/aws/links/test_base_aws.py", - "tests/providers/amazon/aws/links/test_batch.py", - "tests/providers/amazon/aws/links/test_emr.py", - "tests/providers/amazon/aws/links/test_glue.py", - "tests/providers/amazon/aws/links/test_logs.py", "tests/providers/amazon/aws/operators/test_dms.py", "tests/providers/amazon/aws/operators/test_emr.py", "tests/providers/amazon/aws/operators/test_sagemaker.py", diff --git a/tests/providers/amazon/aws/links/conftest.py b/tests/providers/amazon/aws/links/conftest.py deleted file mode 100644 index b2b9a4188c..0000000000 --- a/tests/providers/amazon/aws/links/conftest.py +++ /dev/null @@ -1,50 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from typing import TYPE_CHECKING - -import pytest - -from tests.providers.amazon.aws.utils.links_test_utils import link_test_operator - -if TYPE_CHECKING: - from airflow.providers.amazon.aws.links.base_aws import BaseAwsLink - - -@pytest.fixture() -def create_task_and_ti_of_op_with_extra_link(create_task_instance_of_operator): - def _create_op_and_ti( - extra_link_class: BaseAwsLink, - *, - dag_id, - task_id, - execution_date=None, - session=None, - **operator_kwargs, - ): - op = link_test_operator(extra_link_class) - return op(task_id=task_id), create_task_instance_of_operator( - link_test_operator(extra_link_class), - dag_id=dag_id, - task_id=task_id, - execution_date=execution_date, - session=session, - **operator_kwargs, - ) - - return _create_op_and_ti diff --git a/tests/providers/amazon/aws/links/test_base.py b/tests/providers/amazon/aws/links/test_base.py deleted file mode 100644 index 42dabde810..0000000000 --- a/tests/providers/amazon/aws/links/test_base.py +++ /dev/null @@ -1,84 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from unittest.mock import MagicMock - -import pytest - -from airflow.providers.amazon.aws.links.base_aws import BaseAwsLink -from tests.test_utils.mock_operators import MockOperator - -XCOM_KEY = "test_xcom_key" -CUSTOM_KEYS = { - "foo": "bar", - "spam": "egg", -} - - -class SimpleBaseAwsLink(BaseAwsLink): - key = XCOM_KEY - - -class TestBaseAwsLink: - @pytest.mark.parametrize( - "region_name, aws_partition,keywords,expected_value", - [ - ("eu-central-1", "aws", {}, {"region_name": "eu-central-1", "aws_domain": "aws.amazon.com"}), - ("cn-north-1", "aws-cn", {}, {"region_name": "cn-north-1", "aws_domain": "amazonaws.cn"}), - ( - "us-gov-east-1", - "aws-us-gov", - {}, - {"region_name": "us-gov-east-1", "aws_domain": "amazonaws-us-gov.com"}, - ), - ( - "eu-west-1", - "aws", - CUSTOM_KEYS, - {"region_name": "eu-west-1", "aws_domain": "aws.amazon.com", **CUSTOM_KEYS}, - ), - ], - ) - def test_persist(self, region_name, aws_partition, keywords, expected_value): - mock_context = MagicMock() - - SimpleBaseAwsLink.persist( - context=mock_context, - operator=MockOperator(task_id="test_task_id"), - region_name=region_name, - aws_partition=aws_partition, - **keywords, - ) - - ti = mock_context["ti"] - ti.xcom_push.assert_called_once_with( - execution_date=None, - key=XCOM_KEY, - value=expected_value, - ) - - def test_disable_xcom_push(self): - mock_context = MagicMock() - SimpleBaseAwsLink.persist( - context=mock_context, - operator=MockOperator(task_id="test_task_id", do_xcom_push=False), - region_name="eu-east-1", - aws_partition="aws", - ) - ti = mock_context["ti"] - ti.xcom_push.assert_not_called() diff --git a/tests/providers/amazon/aws/links/test_base_aws.py b/tests/providers/amazon/aws/links/test_base_aws.py new file mode 100644 index 0000000000..a8bf17c3db --- /dev/null +++ b/tests/providers/amazon/aws/links/test_base_aws.py @@ -0,0 +1,215 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from abc import abstractmethod +from typing import TYPE_CHECKING, NamedTuple +from unittest.mock import MagicMock + +import pytest + +from airflow.providers.amazon.aws.links.base_aws import BaseAwsLink +from airflow.serialization.serialized_objects import SerializedDAG +from tests.test_utils.mock_operators import MockOperator + +if TYPE_CHECKING: + from airflow.models import TaskInstance + +XCOM_KEY = "test_xcom_key" +CUSTOM_KEYS = { + "foo": "bar", + "spam": "egg", +} +TEST_REGION_NAME = "eu-west-1" +TEST_AWS_PARTITION = "aws" + + +class SimpleBaseAwsLink(BaseAwsLink): + key = XCOM_KEY + + +class TestBaseAwsLink: + @pytest.mark.parametrize( + "region_name, aws_partition,keywords,expected_value", + [ + ("eu-central-1", "aws", {}, {"region_name": "eu-central-1", "aws_domain": "aws.amazon.com"}), + ("cn-north-1", "aws-cn", {}, {"region_name": "cn-north-1", "aws_domain": "amazonaws.cn"}), + ( + "us-gov-east-1", + "aws-us-gov", + {}, + {"region_name": "us-gov-east-1", "aws_domain": "amazonaws-us-gov.com"}, + ), + ( + "eu-west-1", + "aws", + CUSTOM_KEYS, + {"region_name": "eu-west-1", "aws_domain": "aws.amazon.com", **CUSTOM_KEYS}, + ), + ], + ) + def test_persist(self, region_name, aws_partition, keywords, expected_value): + mock_context = MagicMock() + + SimpleBaseAwsLink.persist( + context=mock_context, + operator=MockOperator(task_id="test_task_id"), + region_name=region_name, + aws_partition=aws_partition, + **keywords, + ) + + ti = mock_context["ti"] + ti.xcom_push.assert_called_once_with( + execution_date=None, + key=XCOM_KEY, + value=expected_value, + ) + + def test_disable_xcom_push(self): + mock_context = MagicMock() + SimpleBaseAwsLink.persist( + context=mock_context, + operator=MockOperator(task_id="test_task_id", do_xcom_push=False), + region_name="eu-east-1", + aws_partition="aws", + ) + ti = mock_context["ti"] + ti.xcom_push.assert_not_called() + + +def link_test_operator(*links): + """Helper for create mock operator class with extra links""" + + class LinkTestOperator(MockOperator): + operator_extra_links = tuple(c() for c in links) + + return LinkTestOperator + + +class OperatorAndTi(NamedTuple): + """Helper container for store task and generated task instance.""" + + task: MockOperator + task_instance: TaskInstance + + +@pytest.mark.db_test +@pytest.mark.need_serialized_dag +class BaseAwsLinksTestCase: + """Base class for AWS Provider links tests.""" + + link_class: type[BaseAwsLink] + + @pytest.fixture(autouse=True) + def setup_base_test_case(self, dag_maker, create_task_instance_of_operator): + self.dag_maker = dag_maker + self.ti_maker = create_task_instance_of_operator + + @property + def full_qualname(self) -> str: + return f"{self.link_class.__module__}.{self.link_class.__qualname__}" + + @property + def task_id(self) -> str: + return f"test-{self.link_class.__name__}" + + def create_op_and_ti( + self, + extra_link_class: type[BaseAwsLink], + *, + dag_id, + task_id, + execution_date=None, + session=None, + **operator_kwargs, + ): + """Helper method for generate operator and task instance""" + op = link_test_operator(extra_link_class) + return OperatorAndTi( + task=op(task_id=task_id), + task_instance=self.ti_maker( + op, + dag_id=dag_id, + task_id=task_id, + execution_date=execution_date, + session=session, + **operator_kwargs, + ), + ) + + def assert_extra_link_url( + self, + expected_url: str, + region_name=TEST_REGION_NAME, + aws_partition=TEST_AWS_PARTITION, + **extra_link_kwargs, + ): + """Helper method for create extra link URL from the parameters.""" + task, ti = self.create_op_and_ti(self.link_class, dag_id="test_extra_link", task_id=self.task_id) + + mock_context = MagicMock() + mock_context.__getitem__.side_effect = {"ti": ti}.__getitem__ + + self.link_class.persist( + context=mock_context, + operator=task, + region_name=region_name, + aws_partition=aws_partition, + **extra_link_kwargs, + ) + + error_msg = f"{self.full_qualname!r} should be preserved after execution" + assert ti.task.get_extra_links(ti, self.link_class.name) == expected_url, error_msg + + serialized_dag = self.dag_maker.get_serialized_data() + deserialized_dag = SerializedDAG.from_dict(serialized_dag) + deserialized_task = deserialized_dag.task_dict[self.task_id] + + error_msg = f"{self.full_qualname!r} should be preserved in deserialized tasks after execution" + assert deserialized_task.get_extra_links(ti, self.link_class.name) == expected_url, error_msg + + def test_link_serialize(self): + """Test: Operator links should exist for serialized DAG.""" + self.create_op_and_ti(self.link_class, dag_id="test_link_serialize", task_id=self.task_id) + serialized_dag = self.dag_maker.get_serialized_data() + operator_extra_link = serialized_dag["dag"]["tasks"][0]["_operator_extra_links"] + error_message = "Operator links should exist for serialized DAG" + assert operator_extra_link == [{self.full_qualname: {}}], error_message + + def test_empty_xcom(self): + """Test: Operator links should return empty string if no XCom value.""" + ti = self.create_op_and_ti( + self.link_class, dag_id="test_empty_xcom", task_id=self.task_id + ).task_instance + + serialized_dag = self.dag_maker.get_serialized_data() + deserialized_dag = SerializedDAG.from_dict(serialized_dag) + deserialized_task = deserialized_dag.task_dict[self.task_id] + + assert ( + ti.task.get_extra_links(ti, self.link_class.name) == "" + ), "Operator link should only be added if job id is available in XCom" + + assert ( + deserialized_task.get_extra_links(ti, self.link_class.name) == "" + ), "Operator link should be empty for deserialized task with no XCom push" + + @abstractmethod + def test_extra_link(self, **kwargs): + """Test: Expected URL Link.""" + raise NotImplementedError(f"{type(self).__name__!r} should implement `test_extra_link` test") diff --git a/tests/providers/amazon/aws/links/test_batch.py b/tests/providers/amazon/aws/links/test_batch.py new file mode 100644 index 0000000000..23a11193ba --- /dev/null +++ b/tests/providers/amazon/aws/links/test_batch.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow.providers.amazon.aws.links.batch import ( + BatchJobDefinitionLink, + BatchJobDetailsLink, + BatchJobQueueLink, +) +from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase + + +class TestBatchJobDefinitionLink(BaseAwsLinksTestCase): + link_class = BatchJobDefinitionLink + + def test_extra_link(self): + self.assert_extra_link_url( + expected_url=( + "https://console.aws.amazon.com/batch/home" + "?region=eu-west-1#job-definition/detail/arn:fake:jd" + ), + region_name="eu-west-1", + aws_partition="aws", + job_definition_arn="arn:fake:jd", + ) + + +class TestBatchJobDetailsLink(BaseAwsLinksTestCase): + link_class = BatchJobDetailsLink + + def test_extra_link(self): + self.assert_extra_link_url( + expected_url="https://console.amazonaws.cn/batch/home?region=cn-north-1#jobs/detail/fake-id", + region_name="cn-north-1", + aws_partition="aws-cn", + job_id="fake-id", + ) + + +class TestBatchJobQueueLink(BaseAwsLinksTestCase): + link_class = BatchJobQueueLink + + def test_extra_link(self): + self.assert_extra_link_url( + expected_url=( + "https://console.aws.amazon.com/batch/home" "?region=us-east-1#queues/detail/arn:fake:jq" + ), + region_name="us-east-1", + aws_partition="aws", + job_queue_arn="arn:fake:jq", + ) diff --git a/tests/providers/amazon/aws/links/test_emr.py b/tests/providers/amazon/aws/links/test_emr.py new file mode 100644 index 0000000000..59c883362a --- /dev/null +++ b/tests/providers/amazon/aws/links/test_emr.py @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from airflow.providers.amazon.aws.links.emr import EmrClusterLink, EmrLogsLink, get_log_uri +from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase + + +class TestEmrClusterLink(BaseAwsLinksTestCase): + link_class = EmrClusterLink + + def test_extra_link(self): + self.assert_extra_link_url( + expected_url=( + "https://console.aws.amazon.com/emr/home" "?region=us-west-1#/clusterDetails/j-TEST-FLOW-ID" + ), + region_name="us-west-1", + aws_partition="aws", + job_flow_id="j-TEST-FLOW-ID", + ) + + +@pytest.mark.parametrize( + "cluster_info, expected_uri", + [ + pytest.param({"Cluster": {}}, None, id="no-log-uri"), + pytest.param({"Cluster": {"LogUri": "s3://myLogUri/"}}, "myLogUri/", id="has-log-uri"), + ], +) +def test_get_log_uri(cluster_info, expected_uri): + emr_client = MagicMock() + emr_client.describe_cluster.return_value = cluster_info + assert get_log_uri(cluster=None, emr_client=emr_client, job_flow_id="test_job_flow_id") == expected_uri + + +class TestEmrLogsLink(BaseAwsLinksTestCase): + link_class = EmrLogsLink + + def test_extra_link(self): + self.assert_extra_link_url( + expected_url=( + "https://console.aws.amazon.com/s3/buckets/myLogUri/" "?region=eu-west-2&prefix=j-8989898989/" + ), + region_name="eu-west-2", + aws_partition="aws", + log_uri="myLogUri/", + job_flow_id="j-8989898989", + ) + + @pytest.mark.parametrize( + "log_url_extra", + [ + pytest.param({}, id="no-log-uri", marks=pytest.mark.xfail), + pytest.param({"log_uri": None}, id="log-uri-none"), + pytest.param({"log_uri": ""}, id="log-uri-empty"), + ], + ) + def test_missing_log_url(self, log_url_extra: dict): + self.assert_extra_link_url(expected_url="", **log_url_extra) diff --git a/tests/providers/amazon/aws/utils/links_test_utils.py b/tests/providers/amazon/aws/links/test_glue.py similarity index 56% rename from tests/providers/amazon/aws/utils/links_test_utils.py rename to tests/providers/amazon/aws/links/test_glue.py index 9d2d6f9502..5f929cd3e9 100644 --- a/tests/providers/amazon/aws/utils/links_test_utils.py +++ b/tests/providers/amazon/aws/links/test_glue.py @@ -16,11 +16,21 @@ # under the License. from __future__ import annotations -from tests.test_utils.mock_operators import MockOperator +from airflow.providers.amazon.aws.links.glue import GlueJobRunDetailsLink +from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase -def link_test_operator(*links): - class LinkTestOperator(MockOperator): - operator_extra_links = tuple(c() for c in links) +class TestGlueJobRunDetailsLink(BaseAwsLinksTestCase): + link_class = GlueJobRunDetailsLink - return LinkTestOperator + def test_extra_link(self): + self.assert_extra_link_url( + expected_url=( + "https://console.aws.amazon.com/gluestudio/home" + "?region=ap-southeast-2#/job/test_job_name/run/11111" + ), + region_name="ap-southeast-2", + aws_partition="aws", + job_run_id="11111", + job_name="test_job_name", + ) diff --git a/tests/providers/amazon/aws/links/test_links.py b/tests/providers/amazon/aws/links/test_links.py deleted file mode 100644 index e0ccf86d2e..0000000000 --- a/tests/providers/amazon/aws/links/test_links.py +++ /dev/null @@ -1,167 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from operator import itemgetter -from unittest.mock import MagicMock - -import pytest - -from airflow.providers.amazon.aws.links.batch import ( - BatchJobDefinitionLink, - BatchJobDetailsLink, - BatchJobQueueLink, -) -from airflow.providers.amazon.aws.links.emr import EmrClusterLink, get_log_uri -from airflow.providers.amazon.aws.links.glue import GlueJobRunDetailsLink -from airflow.providers.amazon.aws.links.logs import CloudWatchEventsLink -from airflow.serialization.serialized_objects import SerializedDAG - -TASK_ID = "test_task_id" -REGION_NAME = "eu-west-1" -AWS_PARTITION = "aws" -AWS_DOMAIN = "aws.amazon.com" - - -def _full_qualname(cls) -> str: - return f"{cls.__module__}.{cls.__qualname__}" - - -# List AWS External Link with test cases -# Expected list op tuple(BaseAwsLink, kwargs, expected URL) -AWS_LINKS = [ - ( - BatchJobDefinitionLink, - {"job_definition_arn": "arn:aws:batch:dummy-region:111111111111:job-definition/batch-test:42"}, - f"https://console.{AWS_DOMAIN}/batch/home?region={REGION_NAME}" - f"#job-definition/detail/arn:aws:batch:dummy-region:111111111111:job-definition/batch-test:42", - ), - ( - BatchJobDetailsLink, - {"job_id": "00000000-0000-0000-0000-000000000000"}, - f"https://console.{AWS_DOMAIN}/batch/home?region={REGION_NAME}" - f"#jobs/detail/00000000-0000-0000-0000-000000000000", - ), - ( - BatchJobQueueLink, - {"job_queue_arn": "arn:aws:batch:dummy-region:111111111111:job-queue/test-queue"}, - f"https://console.{AWS_DOMAIN}/batch/home?region={REGION_NAME}" - f"#queues/detail/arn:aws:batch:dummy-region:111111111111:job-queue/test-queue", - ), - ( - EmrClusterLink, - {"job_flow_id": "j-TEST-FLOW-ID"}, - f"https://console.{AWS_DOMAIN}/emr/home?region={REGION_NAME}#/clusterDetails/j-TEST-FLOW-ID", - ), - ( - CloudWatchEventsLink, - { - "awslogs_region": "ap-southeast-2", - "awslogs_group": "/test/logs/group", - "awslogs_stream_name": "test/stream/d56a66bb98a14c4593defa1548686edf", - }, - f"https://console.{AWS_DOMAIN}/cloudwatch/home?region=ap-southeast-2" - f"#logsV2:log-groups/log-group/%2Ftest%2Flogs%2Fgroup" - f"/log-events/test%2Fstream%2Fd56a66bb98a14c4593defa1548686edf", - ), - ( - GlueJobRunDetailsLink, - {"job_run_id": "11111", "job_name": "test_job_name"}, - f"https://console.{AWS_DOMAIN}/gluestudio/home?region={REGION_NAME}#/job/test_job_name/run/11111", - ), -] - - -@pytest.mark.db_test -@pytest.mark.need_serialized_dag -class TestAwsLinks: - @pytest.mark.parametrize("extra_link_class", map(itemgetter(0), AWS_LINKS), ids=_full_qualname) - def test_link_serialize(self, extra_link_class, dag_maker, create_task_and_ti_of_op_with_extra_link): - """Test: Operator links should exist for serialized DAG.""" - create_task_and_ti_of_op_with_extra_link( - extra_link_class, dag_id="test_link_serialize", task_id=TASK_ID - ) - serialized_dag = dag_maker.get_serialized_data() - assert serialized_dag["dag"]["tasks"][0]["_operator_extra_links"] == [ - {f"{_full_qualname(extra_link_class)}": {}} - ], "Operator links should exist for serialized DAG" - - @pytest.mark.parametrize("extra_link_class", map(itemgetter(0), AWS_LINKS), ids=_full_qualname) - def test_empty_xcom(self, extra_link_class, dag_maker, create_task_and_ti_of_op_with_extra_link): - """Test: Operator links should return empty string if no XCom value.""" - _, ti = create_task_and_ti_of_op_with_extra_link( - extra_link_class, dag_id="test_empty_xcom", task_id=TASK_ID - ) - - serialized_dag = dag_maker.get_serialized_data() - deserialized_dag = SerializedDAG.from_dict(serialized_dag) - deserialized_task = deserialized_dag.task_dict[TASK_ID] - - assert ( - ti.task.get_extra_links(ti, extra_link_class.name) == "" - ), "Operator link should only be added if job id is available in XCom" - - assert ( - deserialized_task.get_extra_links(ti, extra_link_class.name) == "" - ), "Operator link should be empty for deserialized task with no XCom push" - - @pytest.mark.parametrize("extra_link_class, extra_link_kwargs, extra_link_expected_url", AWS_LINKS) - def test_extra_link( - self, - extra_link_class, - extra_link_kwargs, - extra_link_expected_url, - dag_maker, - create_task_and_ti_of_op_with_extra_link, - ): - """Test: Expected URL Link.""" - task, ti = create_task_and_ti_of_op_with_extra_link( - extra_link_class, dag_id="test_extra_link", task_id=TASK_ID - ) - - mock_context = MagicMock() - mock_context.__getitem__.side_effect = {"ti": ti}.__getitem__ - - extra_link_class.persist( - context=mock_context, - operator=task, - region_name=REGION_NAME, - aws_partition=AWS_PARTITION, - **extra_link_kwargs, - ) - - assert ( - ti.task.get_extra_links(ti, extra_link_class.name) == extra_link_expected_url - ), f"{_full_qualname(extra_link_class)} should be preserved after execution" - - serialized_dag = dag_maker.get_serialized_data() - deserialized_dag = SerializedDAG.from_dict(serialized_dag) - deserialized_task = deserialized_dag.task_dict[TASK_ID] - - assert ( - deserialized_task.get_extra_links(ti, extra_link_class.name) == extra_link_expected_url - ), f"{_full_qualname(extra_link_class)} should be preserved in deserialized tasks after execution" - - -@pytest.mark.parametrize( - "cluster_info, expected_uri", - (({"Cluster": {}}, None), ({"Cluster": {"LogUri": "s3://myLogUri/"}}, "myLogUri/")), -) -def test_get_log_uri(cluster_info, expected_uri): - emr_client = MagicMock() - emr_client.describe_cluster.return_value = cluster_info - assert get_log_uri(cluster=None, emr_client=emr_client, job_flow_id="test_job_flow_id") == expected_uri diff --git a/tests/providers/amazon/aws/links/test_logs.py b/tests/providers/amazon/aws/links/test_logs.py new file mode 100644 index 0000000000..991a8bc6f0 --- /dev/null +++ b/tests/providers/amazon/aws/links/test_logs.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow.providers.amazon.aws.links.logs import CloudWatchEventsLink +from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase + + +class TestCloudWatchEventsLink(BaseAwsLinksTestCase): + link_class = CloudWatchEventsLink + + def test_extra_link(self): + self.assert_extra_link_url( + expected_url=( + "https://console.aws.amazon.com/cloudwatch/home" + "?region=ap-southeast-2#logsV2:log-groups/log-group/%2Ftest%2Flogs%2Fgroup" + "/log-events/test%2Fstream%2Fd56a66bb98a14c4593defa1548686edf" + ), + region_name="us-west-1", + aws_partition="aws", + awslogs_region="ap-southeast-2", + awslogs_group="/test/logs/group", + awslogs_stream_name="test/stream/d56a66bb98a14c4593defa1548686edf", + )