This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dc510d00f42 Add operators for AspectType resource (#46240)
dc510d00f42 is described below
commit dc510d00f428cf1a77238666cb5202b71ffb96cb
Author: VladaZakharova <[email protected]>
AuthorDate: Sat Feb 1 13:12:49 2025 +0100
Add operators for AspectType resource (#46240)
Co-authored-by: Ulada Zakharava <[email protected]>
---
.../operators/cloud/dataplex.rst | 84 ++++
docs/spelling_wordlist.txt | 2 +
.../providers/google/cloud/hooks/dataplex.py | 196 ++++++++++
.../providers/google/cloud/links/dataplex.py | 49 +++
.../providers/google/cloud/operators/dataplex.py | 424 ++++++++++++++++++++-
.../src/airflow/providers/google/provider.yaml | 2 +
.../tests/google/cloud/hooks/test_dataplex.py | 105 +++++
.../tests/google/cloud/links/test_dataplex.py | 57 ++-
.../tests/google/cloud/operators/test_dataplex.py | 149 +++++++-
.../cloud/dataplex/example_dataplex_catalog.py | 74 ++++
10 files changed, 1137 insertions(+), 5 deletions(-)
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
index a8bd9acde56..fc45b5b95bc 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
@@ -595,3 +595,87 @@ use
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUp
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_update_entry_type]
:end-before: [END howto_operator_dataplex_catalog_update_entry_type]
+
+.. _howto/operator:DataplexCatalogCreateAspectTypeOperator:
+
+Create an AspectType
+--------------------
+
+To create an Aspect Type in specific location in Dataplex Catalog you can
+use
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateAspectTypeOperator`
+For more information about the available fields to pass when creating an
Aspect Type, visit `Aspect Type resource configuration.
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.aspectTypes#AspectType>`__
+
+A simple Aspect Group configuration can look as followed:
+
+.. exampleinclude::
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+ :language: python
+ :dedent: 0
+ :start-after: [START howto_dataplex_aspect_type_configuration]
+ :end-before: [END howto_dataplex_aspect_type_configuration]
+
+With this configuration you can create an Aspect Type resource:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateAspectTypeOperator`
+
+.. exampleinclude::
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_dataplex_catalog_create_aspect_type]
+ :end-before: [END howto_operator_dataplex_catalog_create_aspect_type]
+
+.. _howto/operator:DataplexCatalogDeleteAspectTypeOperator:
+
+Delete an AspectType
+--------------------
+
+To delete an Aspect Type in specific location in Dataplex Catalog you can
+use
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteAspectTypeOperator`
+
+.. exampleinclude::
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_dataplex_catalog_delete_aspect_type]
+ :end-before: [END howto_operator_dataplex_catalog_delete_aspect_type]
+
+.. _howto/operator:DataplexCatalogListAspectTypesOperator:
+
+List AspectTypes
+----------------
+
+To list all Aspect Types in specific location in Dataplex Catalog you can
+use
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListAspectTypesOperator`.
+This operator also supports filtering and ordering the result of the operation.
+
+.. exampleinclude::
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_dataplex_catalog_list_aspect_types]
+ :end-before: [END howto_operator_dataplex_catalog_list_aspect_types]
+
+.. _howto/operator:DataplexCatalogGetAspectTypeOperator:
+
+Get an AspectType
+-----------------
+
+To retrieve an Aspect Group in specific location in Dataplex Catalog you can
+use
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetAspectTypeOperator`
+
+.. exampleinclude::
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_dataplex_catalog_get_aspect_type]
+ :end-before: [END howto_operator_dataplex_catalog_get_aspect_type]
+
+.. _howto/operator:DataplexCatalogUpdateAspectTypeOperator:
+
+Update an AspectType
+--------------------
+
+To update an Aspect Type in specific location in Dataplex Catalog you can
+use
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateAspectTypeOperator`
+
+.. exampleinclude::
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_dataplex_catalog_update_aspect_type]
+ :end-before: [END howto_operator_dataplex_catalog_update_aspect_type]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 2b35ed5bc2d..eeb1978738d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -91,6 +91,8 @@ asciiart
asctime
asend
asia
+AspectType
+AspectTypes
assertEqualIgnoreMultipleSpaces
AssetEvent
AssetEvents
diff --git a/providers/src/airflow/providers/google/cloud/hooks/dataplex.py
b/providers/src/airflow/providers/google/cloud/hooks/dataplex.py
index df6a4414284..28f85969006 100644
--- a/providers/src/airflow/providers/google/cloud/hooks/dataplex.py
+++ b/providers/src/airflow/providers/google/cloud/hooks/dataplex.py
@@ -32,6 +32,7 @@ from google.cloud.dataplex_v1 import (
)
from google.cloud.dataplex_v1.services.catalog_service import
CatalogServiceClient
from google.cloud.dataplex_v1.types import (
+ AspectType,
Asset,
DataScan,
DataScanJob,
@@ -56,6 +57,7 @@ if TYPE_CHECKING:
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from google.cloud.dataplex_v1.services.catalog_service.pagers import (
+ ListAspectTypesPager,
ListEntryGroupsPager,
ListEntryTypesPager,
)
@@ -138,6 +140,78 @@ class DataplexHook(GoogleBaseHook):
error = operation.exception(timeout=timeout)
raise AirflowException(error)
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_aspect_type(
+ self,
+ location: str,
+ aspect_type_id: str,
+ aspect_type_configuration: AspectType | dict,
+ project_id: str = PROVIDE_PROJECT_ID,
+ validate_only: bool = False,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Operation:
+ """
+ Create an EntryType resource.
+
+ :param location: Required. The ID of the Google Cloud location that
the task belongs to.
+ :param aspect_type_id: Required. AspectType identifier.
+ :param aspect_type_configuration: Required. AspectType configuration
body.
+ :param project_id: Optional. The ID of the Google Cloud project that
the task belongs to.
+ :param validate_only: Optional. If set, performs request validation,
but does not actually execute
+ the create request.
+ :param retry: Optional. A retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :param timeout: Optional. The amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Optional. Additional metadata that is provided to the
method.
+ """
+ client = self.get_dataplex_catalog_client()
+ return client.create_aspect_type(
+ request={
+ "parent": client.common_location_path(project_id, location),
+ "aspect_type_id": aspect_type_id,
+ "aspect_type": aspect_type_configuration,
+ "validate_only": validate_only,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_aspect_type(
+ self,
+ location: str,
+ aspect_type_id: str,
+ project_id: str = PROVIDE_PROJECT_ID,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> AspectType:
+ """
+ Get an AspectType resource.
+
+ :param location: Required. The ID of the Google Cloud location that
the task belongs to.
+ :param aspect_type_id: Required. AspectType identifier.
+ :param project_id: Optional. The ID of the Google Cloud project that
the task belongs to.
+ :param retry: Optional. A retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :param timeout: Optional. The amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Optional. Additional metadata that is provided to the
method.
+ """
+ client = self.get_dataplex_catalog_client()
+ return client.get_aspect_type(
+ request={
+ "name": client.aspect_type_path(project_id, location,
aspect_type_id),
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
@GoogleBaseHook.fallback_to_default_project_id
def create_entry_type(
self,
@@ -210,6 +284,128 @@ class DataplexHook(GoogleBaseHook):
metadata=metadata,
)
+ @GoogleBaseHook.fallback_to_default_project_id
+ def delete_aspect_type(
+ self,
+ location: str,
+ aspect_type_id: str,
+ project_id: str = PROVIDE_PROJECT_ID,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Operation:
+ """
+ Delete an AspectType resource.
+
+ :param location: Required. The ID of the Google Cloud location that
the task belongs to.
+ :param aspect_type_id: Required. AspectType identifier.
+ :param project_id: Optional. The ID of the Google Cloud project that
the task belongs to.
+ :param retry: Optional. A retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :param timeout: Optional. The amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Optional. Additional metadata that is provided to the
method.
+ """
+ client = self.get_dataplex_catalog_client()
+ return client.delete_aspect_type(
+ request={
+ "name": client.aspect_type_path(project_id, location,
aspect_type_id),
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def list_aspect_types(
+ self,
+ location: str,
+ filter_by: str | None = None,
+ order_by: str | None = None,
+ page_size: int | None = None,
+ page_token: str | None = None,
+ project_id: str = PROVIDE_PROJECT_ID,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> ListAspectTypesPager:
+ """
+ List AspectTypes resources from specific location.
+
+ :param location: Required. The ID of the Google Cloud location that
the task belongs to.
+ :param filter_by: Optional. Filter to apply on the list results.
+ :param order_by: Optional. Fields to order the results by.
+ :param page_size: Optional. Maximum number of EntryGroups to return on
one page.
+ :param page_token: Optional. Token to retrieve the next page of
results.
+ :param project_id: Optional. The ID of the Google Cloud project that
the task belongs to.
+ :param retry: Optional. A retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :param timeout: Optional. The amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Optional. Additional metadata that is provided to the
method.
+ """
+ client = self.get_dataplex_catalog_client()
+ return client.list_aspect_types(
+ request={
+ "parent": client.common_location_path(project_id, location),
+ "filter": filter_by,
+ "order_by": order_by,
+ "page_size": page_size,
+ "page_token": page_token,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def update_aspect_type(
+ self,
+ location: str,
+ aspect_type_id: str,
+ aspect_type_configuration: dict | AspectType,
+ project_id: str = PROVIDE_PROJECT_ID,
+ update_mask: list[str] | FieldMask | None = None,
+ validate_only: bool | None = False,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Operation:
+ """
+ Update an AspectType resource.
+
+ :param aspect_type_id: Required. ID of the AspectType to update.
+ :param aspect_type_configuration: Required. The updated configuration
body of the AspectType.
+ :param location: Required. The ID of the Google Cloud location that
the task belongs to.
+ :param update_mask: Optional. Names of fields whose values to
overwrite on an entry group.
+ If this parameter is absent or empty, all modifiable fields are
overwritten. If such
+ fields are non-required and omitted in the request body, their
values are emptied.
+ :param project_id: Optional. The ID of the Google Cloud project that
the task belongs to.
+ :param validate_only: Optional. The service validates the request
without performing any mutations.
+ :param retry: Optional. A retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :param timeout: Optional. The amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Optional. Additional metadata that is provided to the
method.
+ """
+ client = self.get_dataplex_catalog_client()
+ _aspect_type = (
+ deepcopy(aspect_type_configuration)
+ if isinstance(aspect_type_configuration, dict)
+ else AspectType.to_dict(aspect_type_configuration)
+ )
+ _aspect_type["name"] = client.aspect_type_path(project_id, location,
aspect_type_id)
+ return client.update_aspect_type(
+ request={
+ "aspect_type": _aspect_type,
+ "update_mask": FieldMask(paths=update_mask) if
type(update_mask) is list else update_mask,
+ "validate_only": validate_only,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
@GoogleBaseHook.fallback_to_default_project_id
def delete_entry_type(
self,
diff --git a/providers/src/airflow/providers/google/cloud/links/dataplex.py
b/providers/src/airflow/providers/google/cloud/links/dataplex.py
index db9bba40276..8f30527c89b 100644
--- a/providers/src/airflow/providers/google/cloud/links/dataplex.py
+++ b/providers/src/airflow/providers/google/cloud/links/dataplex.py
@@ -39,6 +39,10 @@ DATAPLEX_CATALOG_ENTRY_TYPE_LINK = (
"/dataplex/projects/{project_id}/locations/{location}/entryTypes/{entry_type_id}?project={project_id}"
)
DATAPLEX_CATALOG_ENTRY_TYPES_LINK =
"/dataplex/catalog/entry-types?project={project_id}"
+DATAPLEX_CATALOG_ASPECT_TYPE_LINK = (
+
"/dataplex/projects/{project_id}/locations/{location}/aspectTypes/{aspect_type_id}?project={project_id}"
+)
+DATAPLEX_CATALOG_ASPECT_TYPES_LINK =
"/dataplex/catalog/aspect-types?project={project_id}"
class DataplexTaskLink(BaseGoogleLink):
@@ -199,3 +203,48 @@ class DataplexCatalogEntryTypesLink(BaseGoogleLink):
"project_id": task_instance.project_id,
},
)
+
+
+class DataplexCatalogAspectTypeLink(BaseGoogleLink):
+ """Helper class for constructing Dataplex Catalog AspectType link."""
+
+ name = "Dataplex Catalog AspectType"
+ key = "dataplex_catalog_aspect_type_key"
+ format_str = DATAPLEX_CATALOG_ASPECT_TYPE_LINK
+
+ @staticmethod
+ def persist(
+ context: Context,
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=DataplexCatalogAspectTypeLink.key,
+ value={
+ "aspect_type_id": task_instance.aspect_type_id,
+ "location": task_instance.location,
+ "project_id": task_instance.project_id,
+ },
+ )
+
+
+class DataplexCatalogAspectTypesLink(BaseGoogleLink):
+ """Helper class for constructing Dataplex Catalog AspectTypes link."""
+
+ name = "Dataplex Catalog AspectTypes"
+ key = "dataplex_catalog_aspect_types_key"
+ format_str = DATAPLEX_CATALOG_ASPECT_TYPES_LINK
+
+ @staticmethod
+ def persist(
+ context: Context,
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=DataplexCatalogAspectTypesLink.key,
+ value={
+ "location": task_instance.location,
+ "project_id": task_instance.project_id,
+ },
+ )
diff --git a/providers/src/airflow/providers/google/cloud/operators/dataplex.py
b/providers/src/airflow/providers/google/cloud/operators/dataplex.py
index ab351b36a28..f442ba7172c 100644
--- a/providers/src/airflow/providers/google/cloud/operators/dataplex.py
+++ b/providers/src/airflow/providers/google/cloud/operators/dataplex.py
@@ -40,12 +40,14 @@ from google.api_core.exceptions import AlreadyExists,
GoogleAPICallError, NotFou
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.retry import Retry, exponential_sleep_generator
from google.cloud.dataplex_v1.types import (
+ AspectType,
Asset,
DataScan,
DataScanJob,
EntryGroup,
EntryType,
Lake,
+ ListAspectTypesResponse,
ListEntryGroupsResponse,
ListEntryTypesResponse,
Task,
@@ -56,6 +58,8 @@ from googleapiclient.errors import HttpError
from airflow.configuration import conf
from airflow.providers.google.cloud.hooks.dataplex import
AirflowDataQualityScanException, DataplexHook
from airflow.providers.google.cloud.links.dataplex import (
+ DataplexCatalogAspectTypeLink,
+ DataplexCatalogAspectTypesLink,
DataplexCatalogEntryGroupLink,
DataplexCatalogEntryGroupsLink,
DataplexCatalogEntryTypeLink,
@@ -2828,7 +2832,7 @@ class
DataplexCatalogListEntryTypesOperator(DataplexCatalogBaseOperator):
:param filter_by: Optional. Filter to apply on the list results.
:param order_by: Optional. Fields to order the results by.
- :param page_size: Optional. Maximum number of EntryGroups to return on the
page.
+ :param page_size: Optional. Maximum number of EntryTypes to return on the
page.
:param page_token: Optional. Token to retrieve the next page of results.
:param project_id: Required. The ID of the Google Cloud project where the
service is used.
:param location: Required. The ID of the Google Cloud region where the
service is used.
@@ -2887,7 +2891,7 @@ class
DataplexCatalogListEntryTypesOperator(DataplexCatalogBaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
- self.log.info("EntryGroup on page: %s", entry_type_on_page)
+ self.log.info("EntryType on page: %s", entry_type_on_page)
self.xcom_push(
context=context,
key="entry_type_page",
@@ -2896,7 +2900,7 @@ class
DataplexCatalogListEntryTypesOperator(DataplexCatalogBaseOperator):
except Exception as ex:
raise AirflowException(ex)
- # Constructing list to return EntryGroups in readable format
+ # Constructing list to return EntryTypes in readable format
entry_types_list = [
MessageToDict(entry_type._pb, preserving_proto_field_name=True)
for entry_type in next(iter(entry_type_on_page.pages)).entry_types
@@ -2997,3 +3001,417 @@ class
DataplexCatalogUpdateEntryTypeOperator(DataplexCatalogBaseOperator):
if not self.validate_request:
self.log.info("EntryType %s was successfully updated.",
self.entry_type_id)
return result
+
+
+class DataplexCatalogCreateAspectTypeOperator(DataplexCatalogBaseOperator):
+ """
+ Create an AspectType resource.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DataplexCatalogCreateAspectTypeOperator`
+
+ :param aspect_type_id: Required. AspectType identifier.
+ :param aspect_type_configuration: Required. AspectType configuration.
+ For more details please see API documentation:
+
https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.aspectTypes#AspectType
+ :param validate_request: Optional. If set, performs request validation,
but does not actually
+ execute the request.
+ :param project_id: Required. The ID of the Google Cloud project where the
service is used.
+ :param location: Required. The ID of the Google Cloud region where the
service is used.
+ :param gcp_conn_id: Optional. The connection ID to use to connect to
Google Cloud.
+ :param retry: Optional. A retry object used to retry requests. If `None`
is specified, requests will not
+ be retried.
+ :param timeout: Optional. The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Optional. Additional metadata that is provided to the
method.
+ :param impersonation_chain: Optional. Service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = tuple(
+ {"aspect_type_id", "aspect_type_configuration"} |
set(DataplexCatalogBaseOperator.template_fields)
+ )
+ operator_extra_links = (DataplexCatalogAspectTypeLink(),)
+
+ def __init__(
+ self,
+ aspect_type_id: str,
+ aspect_type_configuration: AspectType | dict,
+ validate_request: bool = False,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.aspect_type_id = aspect_type_id
+ self.aspect_type_configuration = aspect_type_configuration
+ self.validate_request = validate_request
+
+ def execute(self, context: Context):
+ DataplexCatalogAspectTypeLink.persist(
+ context=context,
+ task_instance=self,
+ )
+
+ if self.validate_request:
+ self.log.info("Validating a Create Dataplex Catalog AspectType
request.")
+ else:
+ self.log.info("Creating a Dataplex Catalog AspectType.")
+
+ try:
+ operation = self.hook.create_aspect_type(
+ aspect_type_id=self.aspect_type_id,
+ aspect_type_configuration=self.aspect_type_configuration,
+ location=self.location,
+ project_id=self.project_id,
+ validate_only=self.validate_request,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ aspect_type = self.hook.wait_for_operation(timeout=self.timeout,
operation=operation)
+ except AlreadyExists:
+ aspect_type = self.hook.get_aspect_type(
+ aspect_type_id=self.aspect_type_id,
+ location=self.location,
+ project_id=self.project_id,
+ )
+ self.log.info(
+ "Dataplex Catalog AspectType %s already exists.",
+ self.aspect_type_id,
+ )
+ result = AspectType.to_dict(aspect_type)
+ return result
+ except Exception as ex:
+ raise AirflowException(ex)
+ else:
+ result = AspectType.to_dict(aspect_type) if not
self.validate_request else None
+
+ if not self.validate_request:
+ self.log.info("Dataplex Catalog AspectType %s was successfully
created.", self.aspect_type_id)
+ return result
+
+
+class DataplexCatalogGetAspectTypeOperator(DataplexCatalogBaseOperator):
+ """
+ Get an AspectType resource.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DataplexCatalogGetAspectTypeOperator`
+
+ :param aspect_type_id: Required. AspectType identifier.
+ :param project_id: Required. The ID of the Google Cloud project where the
service is used.
+ :param location: Required. The ID of the Google Cloud region where the
service is used.
+ :param gcp_conn_id: Optional. The connection ID to use to connect to
Google Cloud.
+ :param retry: Optional. A retry object used to retry requests. If `None`
is specified, requests will not
+ be retried.
+ :param timeout: Optional. The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Optional. Additional metadata that is provided to the
method.
+ :param impersonation_chain: Optional. Service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = tuple(
+ {"aspect_type_id"} | set(DataplexCatalogBaseOperator.template_fields)
+ )
+ operator_extra_links = (DataplexCatalogAspectTypeLink(),)
+
+ def __init__(
+ self,
+ aspect_type_id: str,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.aspect_type_id = aspect_type_id
+
+ def execute(self, context: Context):
+ DataplexCatalogAspectTypeLink.persist(
+ context=context,
+ task_instance=self,
+ )
+ self.log.info(
+ "Retrieving Dataplex Catalog AspectType %s.",
+ self.aspect_type_id,
+ )
+ try:
+ aspect_type = self.hook.get_aspect_type(
+ aspect_type_id=self.aspect_type_id,
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ except NotFound:
+ self.log.info(
+ "Dataplex Catalog AspectType %s not found.",
+ self.aspect_type_id,
+ )
+ raise AirflowException(NotFound)
+ except Exception as ex:
+ raise AirflowException(ex)
+
+ return AspectType.to_dict(aspect_type)
+
+
+class DataplexCatalogListAspectTypesOperator(DataplexCatalogBaseOperator):
+ """
+ List AspectType resources.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DataplexCatalogListAspectTypesOperator`
+
+ :param filter_by: Optional. Filter to apply on the list results.
+ :param order_by: Optional. Fields to order the results by.
+ :param page_size: Optional. Maximum number of AspectTypes to return on the
page.
+ :param page_token: Optional. Token to retrieve the next page of results.
+ :param project_id: Required. The ID of the Google Cloud project where the
service is used.
+ :param location: Required. The ID of the Google Cloud region where the
service is used.
+ :param gcp_conn_id: Optional. The connection ID to use to connect to
Google Cloud.
+ :param retry: Optional. A retry object used to retry requests. If `None`
is specified, requests will not
+ be retried.
+ :param timeout: Optional. The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Optional. Additional metadata that is provided to the
method.
+ :param impersonation_chain: Optional. Service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] =
tuple(DataplexCatalogBaseOperator.template_fields)
+ operator_extra_links = (DataplexCatalogAspectTypesLink(),)
+
+ def __init__(
+ self,
+ page_size: int | None = None,
+ page_token: str | None = None,
+ filter_by: str | None = None,
+ order_by: str | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.page_size = page_size
+ self.page_token = page_token
+ self.filter_by = filter_by
+ self.order_by = order_by
+
+ def execute(self, context: Context):
+ DataplexCatalogAspectTypesLink.persist(
+ context=context,
+ task_instance=self,
+ )
+ self.log.info(
+ "Listing Dataplex Catalog AspectType from location %s.",
+ self.location,
+ )
+ try:
+ aspect_type_on_page = self.hook.list_aspect_types(
+ location=self.location,
+ project_id=self.project_id,
+ page_size=self.page_size,
+ page_token=self.page_token,
+ filter_by=self.filter_by,
+ order_by=self.order_by,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ self.log.info("AspectType on page: %s", aspect_type_on_page)
+ self.xcom_push(
+ context=context,
+ key="aspect_type_page",
+
value=ListAspectTypesResponse.to_dict(aspect_type_on_page._response),
+ )
+ except Exception as ex:
+ raise AirflowException(ex)
+
+ # Constructing list to return AspectTypes in readable format
+ aspect_types_list = [
+ MessageToDict(aspect_type._pb, preserving_proto_field_name=True)
+ for aspect_type in
next(iter(aspect_type_on_page.pages)).aspect_types
+ ]
+ return aspect_types_list
+
+
+class DataplexCatalogUpdateAspectTypeOperator(DataplexCatalogBaseOperator):
+ """
+ Update an AspectType resource.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DataplexCatalogUpdateAspectTypeOperator`
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param location: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param update_mask: Optional. Names of fields whose values to overwrite on
an entry group.
+ If this parameter is absent or empty, all modifiable fields are
overwritten. If such
+ fields are non-required and omitted in the request body, their values
are emptied.
+ :param aspect_type_id: Required. ID of the AspectType to update.
+ :param aspect_type_configuration: Required. The updated configuration body
of the AspectType.
+ For more details please see API documentation:
+
https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.aspectTypes#AspectType
+ :param validate_only: Optional. The service validates the request without
performing any mutations.
+ :param retry: Optional. A retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :param timeout: Optional. The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Optional. Additional metadata that is provided to the
method.
+ :param gcp_conn_id: Optional. The connection ID to use when fetching
connection info.
+ :param impersonation_chain: Optional. Service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = tuple(
+ {"aspect_type_id", "aspect_type_configuration", "update_mask"}
+ | set(DataplexCatalogBaseOperator.template_fields)
+ )
+ operator_extra_links = (DataplexCatalogAspectTypeLink(),)
+
+ def __init__(
+ self,
+ aspect_type_id: str,
+ aspect_type_configuration: dict | AspectType,
+ update_mask: list[str] | FieldMask | None = None,
+ validate_request: bool | None = False,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.aspect_type_id = aspect_type_id
+ self.aspect_type_configuration = aspect_type_configuration
+ self.update_mask = update_mask
+ self.validate_request = validate_request
+
+ def execute(self, context: Context):
+ DataplexCatalogAspectTypeLink.persist(
+ context=context,
+ task_instance=self,
+ )
+
+ if self.validate_request:
+ self.log.info("Validating an Update Dataplex Catalog AspectType
request.")
+ else:
+ self.log.info(
+ "Updating Dataplex Catalog AspectType %s.",
+ self.aspect_type_id,
+ )
+ try:
+ operation = self.hook.update_aspect_type(
+ location=self.location,
+ project_id=self.project_id,
+ aspect_type_id=self.aspect_type_id,
+ aspect_type_configuration=self.aspect_type_configuration,
+ update_mask=self.update_mask,
+ validate_only=self.validate_request,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ aspect_type = self.hook.wait_for_operation(timeout=self.timeout,
operation=operation)
+
+ except NotFound as ex:
+ self.log.info("Specified AspectType was not found.")
+ raise AirflowException(ex)
+ except Exception as exc:
+ raise AirflowException(exc)
+ else:
+ result = AspectType.to_dict(aspect_type) if not
self.validate_request else None
+
+ if not self.validate_request:
+ self.log.info("AspectType %s was successfully updated.",
self.aspect_type_id)
+ return result
+
+
+class DataplexCatalogDeleteAspectTypeOperator(DataplexCatalogBaseOperator):
+ """
+ Delete an AspectType resource.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DataplexCatalogDeleteAspectTypeOperator`
+
+ :param aspect_type_id: Required. AspectType identifier.
+ :param project_id: Required. The ID of the Google Cloud project where the
service is used.
+ :param location: Required. The ID of the Google Cloud region where the
service is used.
+ :param gcp_conn_id: Optional. The connection ID to use to connect to
Google Cloud.
+ :param retry: Optional. A retry object used to retry requests. If `None`
is specified, requests will not
+ be retried.
+ :param timeout: Optional. The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Optional. Additional metadata that is provided to the
method.
+ :param impersonation_chain: Optional. Service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = tuple(
+ {"aspect_type_id"} | set(DataplexCatalogBaseOperator.template_fields)
+ )
+
+ def __init__(
+ self,
+ aspect_type_id: str,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.aspect_type_id = aspect_type_id
+
+ def execute(self, context: Context):
+ self.log.info(
+ "Deleting Dataplex Catalog AspectType %s.",
+ self.aspect_type_id,
+ )
+ try:
+ operation = self.hook.delete_aspect_type(
+ aspect_type_id=self.aspect_type_id,
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ self.hook.wait_for_operation(timeout=self.timeout,
operation=operation)
+
+ except NotFound:
+ self.log.info(
+ "Dataplex Catalog AspectType %s not found.",
+ self.aspect_type_id,
+ )
+ raise AirflowException(NotFound)
+ except Exception as ex:
+ raise AirflowException(ex)
+ return None
diff --git a/providers/src/airflow/providers/google/provider.yaml
b/providers/src/airflow/providers/google/provider.yaml
index fb8129f183c..2dbfdeb3ed1 100644
--- a/providers/src/airflow/providers/google/provider.yaml
+++ b/providers/src/airflow/providers/google/provider.yaml
@@ -1203,6 +1203,8 @@ extra-links:
- airflow.providers.google.cloud.links.dataplex.DataplexTaskLink
- airflow.providers.google.cloud.links.dataplex.DataplexTasksLink
- airflow.providers.google.cloud.links.dataplex.DataplexLakeLink
+ - airflow.providers.google.cloud.links.dataplex.DataplexCatalogAspectTypeLink
+ -
airflow.providers.google.cloud.links.dataplex.DataplexCatalogAspectTypesLink
- airflow.providers.google.cloud.links.dataplex.DataplexCatalogEntryGroupLink
-
airflow.providers.google.cloud.links.dataplex.DataplexCatalogEntryGroupsLink
- airflow.providers.google.cloud.links.dataplex.DataplexCatalogEntryTypeLink
diff --git a/providers/tests/google/cloud/hooks/test_dataplex.py
b/providers/tests/google/cloud/hooks/test_dataplex.py
index 39f447e591b..cd8b3812968 100644
--- a/providers/tests/google/cloud/hooks/test_dataplex.py
+++ b/providers/tests/google/cloud/hooks/test_dataplex.py
@@ -56,6 +56,9 @@ ENTRY_GROUP_UPDATED_BODY = {"description": "Some new descr"}
ENTRY_TYPE_ID = "entry-type-id"
ENTRY_TYPE_BODY = {"description": "Some descr"}
ENTRY_TYPE_UPDATED_BODY = {"description": "Some new descr"}
+ASPECT_TYPE_ID = "aspect-type-id"
+ASPECT_TYPE_BODY = {"description": "Some descr"}
+ASPECT_TYPE_UPDATED_BODY = {"description": "Some new descr"}
UPDATE_MASK = ["description"]
COMMON_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}"
@@ -67,6 +70,7 @@ ASSET_PARENT =
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/
DATASCAN_PARENT = f"projects/{PROJECT_ID}/locations/{REGION}"
ENTRY_GROUP_PARENT =
f"projects/{PROJECT_ID}/locations/{LOCATION}/entryGroup/{ENTRY_GROUP_ID}"
ENTRY_TYPE_PARENT =
f"projects/{PROJECT_ID}/locations/{LOCATION}/entryType/{ENTRY_TYPE_ID}"
+ASPECT_TYPE_PARENT =
f"projects/{PROJECT_ID}/locations/{LOCATION}/aspectType/{ASPECT_TYPE_ID}"
class TestDataplexHook:
@@ -530,3 +534,104 @@ class TestDataplexHook:
timeout=None,
metadata=(),
)
+
+ @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+ def test_create_aspect_type(self, mock_client):
+ mock_common_location_path =
mock_client.return_value.common_location_path
+ mock_common_location_path.return_value = COMMON_PARENT
+ self.hook.create_aspect_type(
+ project_id=PROJECT_ID,
+ location=LOCATION,
+ aspect_type_id=ASPECT_TYPE_ID,
+ aspect_type_configuration=ASPECT_TYPE_BODY,
+ validate_only=False,
+ )
+ mock_client.return_value.create_aspect_type.assert_called_once_with(
+ request=dict(
+ parent=COMMON_PARENT,
+ aspect_type_id=ASPECT_TYPE_ID,
+ aspect_type=ASPECT_TYPE_BODY,
+ validate_only=False,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+ def test_delete_aspect_type(self, mock_client):
+ mock_common_location_path = mock_client.return_value.aspect_type_path
+ mock_common_location_path.return_value = ASPECT_TYPE_PARENT
+ self.hook.delete_aspect_type(project_id=PROJECT_ID, location=LOCATION,
aspect_type_id=ASPECT_TYPE_ID)
+
+ mock_client.return_value.delete_aspect_type.assert_called_once_with(
+ request=dict(
+ name=ASPECT_TYPE_PARENT,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+ def test_list_aspect_types(self, mock_client):
+ mock_common_location_path =
mock_client.return_value.common_location_path
+ mock_common_location_path.return_value = COMMON_PARENT
+ self.hook.list_aspect_types(
+ project_id=PROJECT_ID,
+ location=LOCATION,
+ order_by="name",
+ page_size=2,
+ filter_by="'description' = 'Some descr'",
+ )
+ mock_client.return_value.list_aspect_types.assert_called_once_with(
+ request=dict(
+ parent=COMMON_PARENT,
+ page_size=2,
+ page_token=None,
+ filter="'description' = 'Some descr'",
+ order_by="name",
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+ def test_get_aspect_type(self, mock_client):
+ mock_common_location_path = mock_client.return_value.aspect_type_path
+ mock_common_location_path.return_value = ASPECT_TYPE_PARENT
+ self.hook.get_aspect_type(project_id=PROJECT_ID, location=LOCATION,
aspect_type_id=ASPECT_TYPE_ID)
+
+ mock_client.return_value.get_aspect_type.assert_called_once_with(
+ request=dict(
+ name=ASPECT_TYPE_PARENT,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+ def test_update_aspect_type(self, mock_client):
+ mock_common_location_path = mock_client.return_value.entry_type_path
+ mock_common_location_path.return_value = ENTRY_TYPE_PARENT
+ self.hook.update_entry_type(
+ project_id=PROJECT_ID,
+ location=LOCATION,
+ entry_type_id=ENTRY_TYPE_ID,
+ entry_type_configuration=ENTRY_TYPE_UPDATED_BODY,
+ update_mask=UPDATE_MASK,
+ validate_only=False,
+ )
+
+ mock_client.return_value.update_entry_type.assert_called_once_with(
+ request=dict(
+ entry_type={**ENTRY_TYPE_UPDATED_BODY, "name":
ENTRY_TYPE_PARENT},
+ update_mask=FieldMask(paths=UPDATE_MASK),
+ validate_only=False,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
diff --git a/providers/tests/google/cloud/links/test_dataplex.py
b/providers/tests/google/cloud/links/test_dataplex.py
index 7b5fc9486aa..23859fbde8c 100644
--- a/providers/tests/google/cloud/links/test_dataplex.py
+++ b/providers/tests/google/cloud/links/test_dataplex.py
@@ -20,6 +20,8 @@ from __future__ import annotations
import pytest
from airflow.providers.google.cloud.links.dataplex import (
+ DataplexCatalogAspectTypeLink,
+ DataplexCatalogAspectTypesLink,
DataplexCatalogEntryGroupLink,
DataplexCatalogEntryGroupsLink,
DataplexCatalogEntryTypeLink,
@@ -29,8 +31,10 @@ from airflow.providers.google.cloud.links.dataplex import (
DataplexTasksLink,
)
from airflow.providers.google.cloud.operators.dataplex import (
+ DataplexCatalogCreateAspectTypeOperator,
DataplexCatalogCreateEntryGroupOperator,
DataplexCatalogCreateEntryTypeOperator,
+ DataplexCatalogGetAspectTypeOperator,
DataplexCatalogGetEntryGroupOperator,
DataplexCatalogGetEntryTypeOperator,
DataplexCreateLakeOperator,
@@ -45,7 +49,10 @@ TEST_ENTRY_GROUP_ID_BODY = {"description": "some
description"}
TEST_ENTRY_GROUPS_ID = "test-entry-groups-id"
TEST_ENTRY_TYPE_ID = "test-entry-type-id"
TEST_ENTRY_TYPE_ID_BODY = {"description": "some description"}
-TEST_ENTRY_TYPES_ID = "test-entry-groups-id"
+TEST_ENTRY_TYPES_ID = "test-entry-types-id"
+TEST_ASPECT_TYPE_ID = "test-aspect-type-id"
+TEST_ASPECT_TYPE_ID_BODY = {"description": "some description"}
+TEST_ASPECT_TYPES_ID = "test-aspect-types-id"
TEST_TASK_ID = "test-task-id"
TEST_TASKS_ID = "test-tasks-id"
TEST_LAKE_ID = "test-lake-id"
@@ -76,6 +83,13 @@ EXPECTED_DATAPLEX_TASK_LINK = (
EXPECTED_DATAPLEX_TASKS_LINK = (
DATAPLEX_BASE_LINK +
f"process/tasks?project={TEST_PROJECT_ID}&qLake={TEST_LAKE_ID}.{TEST_LOCATION}"
)
+EXPECTED_DATAPLEX_CATALOG_ASPECT_TYPE_LINK = (
+ DATAPLEX_BASE_LINK
+ +
f"projects/{TEST_PROJECT_ID}/locations/{TEST_LOCATION}/aspectTypes/{TEST_ASPECT_TYPE_ID}?project={TEST_PROJECT_ID}"
+)
+EXPECTED_DATAPLEX_CATALOG_ASPECT_TYPES_LINK = (
+ DATAPLEX_BASE_LINK + f"catalog/aspect-types?project={TEST_PROJECT_ID}"
+)
class TestDataplexTaskLink:
@@ -221,3 +235,44 @@ class TestDataplexCatalogEntryTypesLink:
link.persist(context={"ti": ti}, task_instance=ti.task)
actual_url = link.get_link(operator=ti.task, ti_key=ti.key)
assert actual_url == expected_url
+
+
+class TestDataplexCatalogAspectTypeLink:
+ @pytest.mark.db_test
+ def test_get_link(self, create_task_instance_of_operator, session):
+ expected_url = EXPECTED_DATAPLEX_CATALOG_ASPECT_TYPE_LINK
+ link = DataplexCatalogAspectTypeLink()
+ ti = create_task_instance_of_operator(
+ DataplexCatalogGetAspectTypeOperator,
+ dag_id="test_link_dag",
+ task_id="test_link_task",
+ location=TEST_LOCATION,
+ aspect_type_id=TEST_ASPECT_TYPE_ID,
+ project_id=TEST_PROJECT_ID,
+ )
+ session.add(ti)
+ session.commit()
+ link.persist(context={"ti": ti}, task_instance=ti.task)
+ actual_url = link.get_link(operator=ti.task, ti_key=ti.key)
+ assert actual_url == expected_url
+
+
+class TestDataplexCatalogAspectTypesLink:
+ @pytest.mark.db_test
+ def test_get_link(self, create_task_instance_of_operator, session):
+ expected_url = EXPECTED_DATAPLEX_CATALOG_ASPECT_TYPES_LINK
+ link = DataplexCatalogAspectTypesLink()
+ ti = create_task_instance_of_operator(
+ DataplexCatalogCreateAspectTypeOperator,
+ dag_id="test_link_dag",
+ task_id="test_link_task",
+ location=TEST_LOCATION,
+ aspect_type_id=TEST_ASPECT_TYPE_ID,
+ aspect_type_configuration=TEST_ASPECT_TYPE_ID_BODY,
+ project_id=TEST_PROJECT_ID,
+ )
+ session.add(ti)
+ session.commit()
+ link.persist(context={"ti": ti}, task_instance=ti.task)
+ actual_url = link.get_link(operator=ti.task, ti_key=ti.key)
+ assert actual_url == expected_url
diff --git a/providers/tests/google/cloud/operators/test_dataplex.py
b/providers/tests/google/cloud/operators/test_dataplex.py
index ef5faa4637e..1f47f04fd2b 100644
--- a/providers/tests/google/cloud/operators/test_dataplex.py
+++ b/providers/tests/google/cloud/operators/test_dataplex.py
@@ -20,8 +20,14 @@ from unittest import mock
import pytest
from google.api_core.gapic_v1.method import DEFAULT
-from google.cloud.dataplex_v1.services.catalog_service.pagers import
ListEntryGroupsPager, ListEntryTypesPager
+from google.cloud.dataplex_v1.services.catalog_service.pagers import (
+ ListAspectTypesPager,
+ ListEntryGroupsPager,
+ ListEntryTypesPager,
+)
from google.cloud.dataplex_v1.types import (
+ ListAspectTypesRequest,
+ ListAspectTypesResponse,
ListEntryGroupsRequest,
ListEntryGroupsResponse,
ListEntryTypesRequest,
@@ -30,12 +36,16 @@ from google.cloud.dataplex_v1.types import (
from airflow.exceptions import TaskDeferred
from airflow.providers.google.cloud.operators.dataplex import (
+ DataplexCatalogCreateAspectTypeOperator,
DataplexCatalogCreateEntryGroupOperator,
DataplexCatalogCreateEntryTypeOperator,
+ DataplexCatalogDeleteAspectTypeOperator,
DataplexCatalogDeleteEntryGroupOperator,
DataplexCatalogDeleteEntryTypeOperator,
+ DataplexCatalogGetAspectTypeOperator,
DataplexCatalogGetEntryGroupOperator,
DataplexCatalogGetEntryTypeOperator,
+ DataplexCatalogListAspectTypesOperator,
DataplexCatalogListEntryGroupsOperator,
DataplexCatalogListEntryTypesOperator,
DataplexCreateAssetOperator,
@@ -68,6 +78,7 @@ ZONE_STR =
"airflow.providers.google.cloud.operators.dataplex.Zone"
ASSET_STR = "airflow.providers.google.cloud.operators.dataplex.Asset"
ENTRY_GROUP_STR =
"airflow.providers.google.cloud.operators.dataplex.EntryGroup"
ENTRY_TYPE_STR = "airflow.providers.google.cloud.operators.dataplex.EntryType"
+ASPECT_TYPE_STR =
"airflow.providers.google.cloud.operators.dataplex.AspectType"
PROJECT_ID = "project-id"
REGION = "region"
@@ -91,6 +102,7 @@ ZONE_ID = "test_zone_id"
JOB_ID = "test_job_id"
ENTRY_GROUP_NAME = "test_entry_group"
ENTRY_TYPE_NAME = "test_entry_type"
+ASPECT_TYPE_NAME = "test_aspect_type"
class TestDataplexCreateTaskOperator:
@@ -1023,3 +1035,138 @@ class TestDataplexCatalogListEntryTypesOperator:
timeout=None,
metadata=(),
)
+
+
+class TestDataplexCatalogCreateAspectTypeOperator:
+ @mock.patch(ASPECT_TYPE_STR)
+ @mock.patch(HOOK_STR)
+ def test_execute(self, hook_mock, aspect_type_mock):
+ op = DataplexCatalogCreateAspectTypeOperator(
+ task_id="create_task",
+ project_id=PROJECT_ID,
+ location=REGION,
+ aspect_type_id=ASPECT_TYPE_NAME,
+ aspect_type_configuration=BODY,
+ validate_request=None,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ aspect_type_mock.return_value.to_dict.return_value = None
+ hook_mock.return_value.wait_for_operation.return_value = None
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.create_aspect_type.assert_called_once_with(
+ aspect_type_id=ASPECT_TYPE_NAME,
+ aspect_type_configuration=BODY,
+ location=REGION,
+ project_id=PROJECT_ID,
+ validate_only=None,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexCatalogGetAspectTypeOperator:
+ @mock.patch(ASPECT_TYPE_STR)
+ @mock.patch(HOOK_STR)
+ def test_execute(self, hook_mock, aspect_type_mock):
+ op = DataplexCatalogGetAspectTypeOperator(
+ project_id=PROJECT_ID,
+ location=REGION,
+ aspect_type_id=ASPECT_TYPE_NAME,
+ task_id="get_task",
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ op.execute(context=mock.MagicMock())
+ aspect_type_mock.return_value.to_dict.return_value = None
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.get_aspect_type.assert_called_once_with(
+ project_id=PROJECT_ID,
+ location=REGION,
+ aspect_type_id=ASPECT_TYPE_NAME,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexCatalogDeleteAspectTypeOperator:
+ @mock.patch(HOOK_STR)
+ def test_execute(self, hook_mock):
+ op = DataplexCatalogDeleteAspectTypeOperator(
+ project_id=PROJECT_ID,
+ location=REGION,
+ aspect_type_id=ASPECT_TYPE_NAME,
+ task_id="delete_task",
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.wait_for_operation.return_value = None
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.delete_aspect_type.assert_called_once_with(
+ project_id=PROJECT_ID,
+ location=REGION,
+ aspect_type_id=ASPECT_TYPE_NAME,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexCatalogListAspectTypesOperator:
+ @mock.patch(ASPECT_TYPE_STR)
+ @mock.patch(HOOK_STR)
+ def test_execute(self, hook_mock, aspect_type_mock):
+ op = DataplexCatalogListAspectTypesOperator(
+ project_id=PROJECT_ID,
+ location=REGION,
+ task_id="list_task",
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.list_aspect_types.return_value =
ListAspectTypesPager(
+ response=(
+ ListAspectTypesResponse(
+ aspect_types=[
+ {
+ "name": "aaa",
+ "description": "Test Aspect Type 1",
+ "display_name": "Aspect Type One",
+ }
+ ]
+ )
+ ),
+ method=mock.MagicMock(),
+ request=ListAspectTypesRequest(parent=""),
+ )
+
+ aspect_type_mock.return_value.to_dict.return_value = None
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ hook_mock.return_value.list_aspect_types.assert_called_once_with(
+ project_id=PROJECT_ID,
+ location=REGION,
+ page_size=None,
+ page_token=None,
+ filter_by=None,
+ order_by=None,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
diff --git
a/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
b/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
index 6f511d0f8e9..ca585df5b20 100644
--- a/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+++ b/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
@@ -25,14 +25,19 @@ import os
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.dataplex import (
+ DataplexCatalogCreateAspectTypeOperator,
DataplexCatalogCreateEntryGroupOperator,
DataplexCatalogCreateEntryTypeOperator,
+ DataplexCatalogDeleteAspectTypeOperator,
DataplexCatalogDeleteEntryGroupOperator,
DataplexCatalogDeleteEntryTypeOperator,
+ DataplexCatalogGetAspectTypeOperator,
DataplexCatalogGetEntryGroupOperator,
DataplexCatalogGetEntryTypeOperator,
+ DataplexCatalogListAspectTypesOperator,
DataplexCatalogListEntryGroupsOperator,
DataplexCatalogListEntryTypesOperator,
+ DataplexCatalogUpdateAspectTypeOperator,
DataplexCatalogUpdateEntryGroupOperator,
DataplexCatalogUpdateEntryTypeOperator,
)
@@ -56,6 +61,22 @@ ENTRY_TYPE_NAME =
f"{DAG_ID}_entry_type_{ENV_ID}".replace("_", "-")
ENTRY_TYPE_BODY = {"display_name": "Display Name", "description": "Some
description"}
# [END howto_dataplex_entry_type_configuration]
+ASPECT_TYPE_NAME = f"{DAG_ID}_aspect_type_{ENV_ID}".replace("_", "-")
+# [START howto_dataplex_aspect_type_configuration]
+ASPECT_TYPE_BODY = {
+ "display_name": "Sample AspectType",
+ "description": "A simple AspectType for demonstration purposes.",
+ "metadata_template": {
+ "name": "sample_field",
+ "type": "record",
+ "annotations": {
+ "display_name": "Sample Field",
+ "description": "A sample field within the AspectType.",
+ },
+ },
+}
+# [END howto_dataplex_aspect_type_configuration]
+
with DAG(
DAG_ID,
start_date=datetime.datetime(2021, 1, 1),
@@ -84,6 +105,17 @@ with DAG(
)
# [END howto_operator_dataplex_catalog_create_entry_type]
+ # [START howto_operator_dataplex_catalog_create_aspect_type]
+ create_aspect_type = DataplexCatalogCreateAspectTypeOperator(
+ task_id="create_aspect_type",
+ project_id=PROJECT_ID,
+ location=GCP_LOCATION,
+ aspect_type_id=ASPECT_TYPE_NAME,
+ aspect_type_configuration=ASPECT_TYPE_BODY,
+ validate_request=False,
+ )
+ # [END howto_operator_dataplex_catalog_create_aspect_type]
+
# [START howto_operator_dataplex_catalog_get_entry_group]
get_entry_group = DataplexCatalogGetEntryGroupOperator(
task_id="get_entry_group",
@@ -102,6 +134,15 @@ with DAG(
)
# [END howto_operator_dataplex_catalog_get_entry_type]
+ # [START howto_operator_dataplex_catalog_get_aspect_type]
+ get_aspect_type = DataplexCatalogGetAspectTypeOperator(
+ task_id="get_aspect_type",
+ project_id=PROJECT_ID,
+ location=GCP_LOCATION,
+ aspect_type_id=ASPECT_TYPE_NAME,
+ )
+ # [END howto_operator_dataplex_catalog_get_aspect_type]
+
# [START howto_operator_dataplex_catalog_list_entry_groups]
list_entry_group = DataplexCatalogListEntryGroupsOperator(
task_id="list_entry_group",
@@ -122,6 +163,16 @@ with DAG(
)
# [END howto_operator_dataplex_catalog_list_entry_types]
+ # [START howto_operator_dataplex_catalog_list_aspect_types]
+ list_aspect_type = DataplexCatalogListAspectTypesOperator(
+ task_id="list_aspect_type",
+ project_id=PROJECT_ID,
+ location=GCP_LOCATION,
+ order_by="name",
+ filter_by='display_name = "Display Name"',
+ )
+ # [END howto_operator_dataplex_catalog_list_aspect_types]
+
# [START howto_operator_dataplex_catalog_update_entry_group]
update_entry_group = DataplexCatalogUpdateEntryGroupOperator(
task_id="update_entry_group",
@@ -144,6 +195,17 @@ with DAG(
)
# [END howto_operator_dataplex_catalog_update_entry_type]
+ # [START howto_operator_dataplex_catalog_update_aspect_type]
+ update_aspect_type = DataplexCatalogUpdateAspectTypeOperator(
+ task_id="update_aspect_type",
+ project_id=PROJECT_ID,
+ location=GCP_LOCATION,
+ aspect_type_id=ASPECT_TYPE_NAME,
+ aspect_type_configuration={"display_name": "Updated Display Name"},
+ update_mask=["display_name"],
+ )
+ # [END howto_operator_dataplex_catalog_update_aspect_type]
+
# [START howto_operator_dataplex_catalog_delete_entry_group]
delete_entry_group = DataplexCatalogDeleteEntryGroupOperator(
task_id="delete_entry_group",
@@ -164,12 +226,24 @@ with DAG(
)
# [END howto_operator_dataplex_catalog_delete_entry_type]
+ # [START howto_operator_dataplex_catalog_delete_aspect_type]
+ delete_aspect_type = DataplexCatalogDeleteAspectTypeOperator(
+ task_id="delete_aspect_type",
+ project_id=PROJECT_ID,
+ location=GCP_LOCATION,
+ aspect_type_id=ASPECT_TYPE_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ # [END howto_operator_dataplex_catalog_delete_aspect_type]
+
(
[
create_entry_group >> get_entry_group >> update_entry_group >>
delete_entry_group,
list_entry_group,
create_entry_type >> get_entry_type >> update_entry_type >>
delete_entry_type,
list_entry_type,
+ create_aspect_type >> get_aspect_type >> update_aspect_type >>
delete_aspect_type,
+ list_aspect_type,
]
)