This is an automated email from the ASF dual-hosted git repository. kamilbregula pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new 9150330 Add Google Cloud Memorystore Memcached Operators (#10121) 9150330 is described below commit 91503308c723b186ce6f4026f2a3e2c21030f6e5 Author: Tanjin Panna <tanjin.pa...@gmail.com> AuthorDate: Wed Oct 21 21:50:40 2020 -0400 Add Google Cloud Memorystore Memcached Operators (#10121) Co-authored-by: Tobiasz Kędzierski <tobiasz.kedzier...@polidea.com> Co-authored-by: Kamil Breguła <mik-...@users.noreply.github.com> --- .../example_dags/example_cloud_memorystore.py | 131 ++++- .../google/cloud/hooks/cloud_memorystore.py | 451 +++++++++++++++ .../google/cloud/operators/cloud_memorystore.py | 616 ++++++++++++++++++++- .../google/cloud/cloud_memorystore_memcached.rst | 158 ++++++ docs/operators-and-hooks-ref.rst | 8 +- setup.py | 1 + .../google/cloud/hooks/test_cloud_memorystore.py | 188 ++++++- .../cloud/operators/test_cloud_memorystore.py | 147 +++++ .../operators/test_cloud_memorystore_system.py | 8 +- 9 files changed, 1687 insertions(+), 21 deletions(-) diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py b/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py index a3d8c2b..704f02c 100644 --- a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py +++ b/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py @@ -22,6 +22,7 @@ import os from urllib.parse import urlparse from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest, Instance +from google.cloud.memcache_v1beta2.types import cloud_memcache from airflow import models from airflow.operators.bash import BashOperator @@ -37,15 +38,29 @@ from airflow.providers.google.cloud.operators.cloud_memorystore import ( CloudMemorystoreListInstancesOperator, CloudMemorystoreScaleInstanceOperator, CloudMemorystoreUpdateInstanceOperator, + CloudMemorystoreMemcachedApplyParametersOperator, + CloudMemorystoreMemcachedCreateInstanceOperator, + CloudMemorystoreMemcachedDeleteInstanceOperator, + CloudMemorystoreMemcachedGetInstanceOperator, + CloudMemorystoreMemcachedListInstancesOperator, + CloudMemorystoreMemcachedUpdateInstanceOperator, + CloudMemorystoreMemcachedUpdateParametersOperator, ) from airflow.providers.google.cloud.operators.gcs import GCSBucketCreateAclEntryOperator from airflow.utils import dates GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") -INSTANCE_NAME = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME", "test-memorystore") -INSTANCE_NAME_2 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME2", "test-memorystore-2") -INSTANCE_NAME_3 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME3", "test-memorystore-3") +MEMORYSTORE_REDIS_INSTANCE_NAME = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME", "test-memorystoreredis-") +MEMORYSTORE_REDIS_INSTANCE_NAME_2 = os.environ.get( + "GCP_MEMORYSTORE_INSTANCE_NAME2", "test-memorystore-redis-2" +) +MEMORYSTORE_REDIS_INSTANCE_NAME_3 = os.environ.get( + "GCP_MEMORYSTORE_INSTANCE_NAME3", "test-memorystore-redis-3" +) +MEMORYSTORE_MEMCACHED_INSTANCE_NAME = os.environ.get( + "GCP_MEMORYSTORE_INSTANCE_NAME4", "test-memorystore-memcached-1" +) EXPORT_GCS_URL = os.environ.get("GCP_MEMORYSTORE_EXPORT_GCS_URL", "gs://test-memorystore/my-export.rdb") EXPORT_GCS_URL_PARTS = urlparse(EXPORT_GCS_URL) @@ -57,9 +72,13 @@ FIRST_INSTANCE = {"tier": Instance.Tier.BASIC, "memory_size_gb": 1} SECOND_INSTANCE = {"tier": Instance.Tier.STANDARD_HA, "memory_size_gb": 3} +# [START howto_operator_memcached_instance] +MEMCACHED_INSTANCE = {"name": "", "node_count": 1, "node_config": {"cpu_count": 1, "memory_size_mb": 1024}} +# [END howto_operator_memcached_instance] + with models.DAG( - "gcp_cloud_memorystore", + "gcp_cloud_memorystore_redis", schedule_interval=None, # Override to match your needs start_date=dates.days_ago(1), tags=['example'], @@ -68,7 +87,7 @@ with models.DAG( create_instance = CloudMemorystoreCreateInstanceOperator( task_id="create-instance", location="europe-north1", - instance_id=INSTANCE_NAME, + instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME, instance=FIRST_INSTANCE, project_id=GCP_PROJECT_ID, ) @@ -84,7 +103,7 @@ with models.DAG( create_instance_2 = CloudMemorystoreCreateInstanceOperator( task_id="create-instance-2", location="europe-north1", - instance_id=INSTANCE_NAME_2, + instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_2, instance=SECOND_INSTANCE, project_id=GCP_PROJECT_ID, ) @@ -93,7 +112,7 @@ with models.DAG( get_instance = CloudMemorystoreGetInstanceOperator( task_id="get-instance", location="europe-north1", - instance=INSTANCE_NAME, + instance=MEMORYSTORE_REDIS_INSTANCE_NAME, project_id=GCP_PROJECT_ID, do_xcom_push=True, ) @@ -109,7 +128,7 @@ with models.DAG( failover_instance = CloudMemorystoreFailoverInstanceOperator( task_id="failover-instance", location="europe-north1", - instance=INSTANCE_NAME_2, + instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2, data_protection_mode=FailoverInstanceRequest.DataProtectionMode.LIMITED_DATA_LOSS, project_id=GCP_PROJECT_ID, ) @@ -131,7 +150,7 @@ with models.DAG( update_instance = CloudMemorystoreUpdateInstanceOperator( task_id="update-instance", location="europe-north1", - instance_id=INSTANCE_NAME, + instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME, project_id=GCP_PROJECT_ID, update_mask={"paths": ["memory_size_gb"]}, instance={"memory_size_gb": 2}, @@ -152,7 +171,7 @@ with models.DAG( export_instance = CloudMemorystoreExportInstanceOperator( task_id="export-instance", location="europe-north1", - instance=INSTANCE_NAME, + instance=MEMORYSTORE_REDIS_INSTANCE_NAME, output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}}, project_id=GCP_PROJECT_ID, ) @@ -162,7 +181,7 @@ with models.DAG( import_instance = CloudMemorystoreImportOperator( task_id="import-instance", location="europe-north1", - instance=INSTANCE_NAME_2, + instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2, input_config={"gcs_source": {"uri": EXPORT_GCS_URL}}, project_id=GCP_PROJECT_ID, ) @@ -170,14 +189,17 @@ with models.DAG( # [START howto_operator_delete_instance] delete_instance = CloudMemorystoreDeleteInstanceOperator( - task_id="delete-instance", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID + task_id="delete-instance", + location="europe-north1", + instance=MEMORYSTORE_REDIS_INSTANCE_NAME, + project_id=GCP_PROJECT_ID, ) # [END howto_operator_delete_instance] delete_instance_2 = CloudMemorystoreDeleteInstanceOperator( task_id="delete-instance-2", location="europe-north1", - instance=INSTANCE_NAME_2, + instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2, project_id=GCP_PROJECT_ID, ) @@ -185,7 +207,7 @@ with models.DAG( create_instance_and_import = CloudMemorystoreCreateInstanceAndImportOperator( task_id="create-instance-and-import", location="europe-north1", - instance_id=INSTANCE_NAME_3, + instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3, instance=FIRST_INSTANCE, input_config={"gcs_source": {"uri": EXPORT_GCS_URL}}, project_id=GCP_PROJECT_ID, @@ -196,7 +218,7 @@ with models.DAG( scale_instance = CloudMemorystoreScaleInstanceOperator( task_id="scale-instance", location="europe-north1", - instance_id=INSTANCE_NAME_3, + instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3, project_id=GCP_PROJECT_ID, memory_size_gb=3, ) @@ -206,7 +228,7 @@ with models.DAG( export_and_delete_instance = CloudMemorystoreExportAndDeleteInstanceOperator( task_id="export-and-delete-instance", location="europe-north1", - instance=INSTANCE_NAME_3, + instance=MEMORYSTORE_REDIS_INSTANCE_NAME_3, output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}}, project_id=GCP_PROJECT_ID, ) @@ -229,3 +251,80 @@ with models.DAG( failover_instance >> delete_instance_2 export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance + +with models.DAG( + "gcp_cloud_memorystore_memcached", + schedule_interval=None, # Override to match your needs + start_date=dates.days_ago(1), + tags=['example'], +) as dag_memcache: + # [START howto_operator_create_instance_memcached] + create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator( + task_id="create-instance", + location="europe-north1", + instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME, + instance=MEMCACHED_INSTANCE, + project_id=GCP_PROJECT_ID, + ) + # [END howto_operator_create_instance_memcached] + + # [START howto_operator_delete_instance_memcached] + delete_memcached_instance = CloudMemorystoreMemcachedDeleteInstanceOperator( + task_id="delete-instance", + location="europe-north1", + instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME, + project_id=GCP_PROJECT_ID, + ) + # [END howto_operator_delete_instance_memcached] + + # [START howto_operator_get_instance_memcached] + get_memcached_instance = CloudMemorystoreMemcachedGetInstanceOperator( + task_id="get-instance", + location="europe-north1", + instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME, + project_id=GCP_PROJECT_ID, + ) + # [END howto_operator_get_instance_memcached] + + # [START howto_operator_list_instances_memcached] + list_memcached_instances = CloudMemorystoreMemcachedListInstancesOperator( + task_id="list-instances", location="-", project_id=GCP_PROJECT_ID + ) + # [END howto_operator_list_instances_memcached] + + # # [START howto_operator_update_instance_memcached] + update_memcached_instance = CloudMemorystoreMemcachedUpdateInstanceOperator( + task_id="update-instance", + location="europe-north1", + instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME, + project_id=GCP_PROJECT_ID, + update_mask=cloud_memcache.field_mask.FieldMask(paths=["node_count"]), + instance={"node_count": 2}, + ) + # [END howto_operator_update_instance_memcached] + + # [START howto_operator_update_and_apply_parameters_memcached] + update_memcached_parameters = CloudMemorystoreMemcachedUpdateParametersOperator( + task_id="update-parameters", + location="europe-north1", + instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME, + project_id=GCP_PROJECT_ID, + update_mask={"paths": ["params"]}, + parameters={"params": {"protocol": "ascii", "hash_algorithm": "jenkins"}}, + ) + + apply_memcached_parameters = CloudMemorystoreMemcachedApplyParametersOperator( + task_id="apply-parameters", + location="europe-north1", + instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME, + project_id=GCP_PROJECT_ID, + node_ids=["node-a-1"], + apply_all=False, + ) + + # update_parameters >> apply_parameters + # [END howto_operator_update_and_apply_parameters_memcached] + + create_memcached_instance >> [list_memcached_instances, get_memcached_instance] + create_memcached_instance >> update_memcached_instance >> update_memcached_parameters + update_memcached_parameters >> apply_memcached_parameters >> delete_memcached_instance diff --git a/airflow/providers/google/cloud/hooks/cloud_memorystore.py b/airflow/providers/google/cloud/hooks/cloud_memorystore.py index 995db95..a28cce1 100644 --- a/airflow/providers/google/cloud/hooks/cloud_memorystore.py +++ b/airflow/providers/google/cloud/hooks/cloud_memorystore.py @@ -17,13 +17,18 @@ # under the License. """Hooks for Cloud Memorystore service""" from typing import Dict, Optional, Sequence, Tuple, Union +import json from google.api_core.exceptions import NotFound +from google.api_core import path_template from google.api_core.retry import Retry +from google.cloud.memcache_v1beta2 import CloudMemcacheClient +from google.cloud.memcache_v1beta2.types import cloud_memcache from google.cloud.redis_v1 import CloudRedisClient from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest from google.cloud.redis_v1.types import FieldMask, InputConfig, Instance, OutputConfig from google.protobuf.json_format import ParseDict +import proto from airflow import version from airflow.exceptions import AirflowException @@ -496,3 +501,449 @@ class CloudMemorystoreHook(GoogleBaseHook): ) result.result() self.log.info("Instance updated: %s", instance.name) + + +class CloudMemorystoreMemcachedHook(GoogleBaseHook): + """ + Hook for Google Cloud Memorystore for Memcached service APIs. + + All the methods in the hook where project_id is used must be called with + keyword arguments rather than positional. + + :param gcp_conn_id: The connection ID to use when fetching connection info. + :type gcp_conn_id: str + :param delegate_to: The account to impersonate using domain-wide delegation of authority, + if any. For this to work, the service account making the request must have + domain-wide delegation enabled. + :type delegate_to: str + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account. + :type impersonation_chain: Union[str, Sequence[str]] + """ + + def __init__( + self, + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + ) -> None: + super().__init__( + gcp_conn_id=gcp_conn_id, + delegate_to=delegate_to, + impersonation_chain=impersonation_chain, + ) + self._client: Optional[CloudMemcacheClient] = None + + def get_conn( + self, + ): + """ + Retrieves client library object that allow access to Cloud Memorystore Memcached service. + """ + if not self._client: + self._client = CloudMemcacheClient(credentials=self._get_credentials()) + return self._client + + @staticmethod + def _append_label(instance: cloud_memcache.Instance, key: str, val: str) -> cloud_memcache.Instance: + """ + Append labels to provided Instance type + + Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current + airflow version string follows semantic versioning spec: x.y.z). + + :param instance: The proto to append resource_label airflow + version to + :type instance: google.cloud.memcache_v1beta2.types.cloud_memcache.Instance + :param key: The key label + :type key: str + :param val: + :type val: str + :return: The cluster proto updated with new label + """ + val = val.replace(".", "-").replace("+", "-") + instance.labels.update({key: val}) + return instance + + @staticmethod + def proto_message_to_dict(message: proto.Message) -> dict: + """Helper method to parse protobuf message to dictionary.""" + return json.loads(message.__class__.to_json(message)) + + @GoogleBaseHook.fallback_to_default_project_id + def apply_parameters( + self, + node_ids: Sequence[str], + apply_all: bool, + project_id: str, + location: str, + instance_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ): + """ + Will update current set of Parameters to the set of specified nodes of the Memcached Instance. + + :param node_ids: Nodes to which we should apply the instance-level parameter group. + :type node_ids: Sequence[str] + :param apply_all: Whether to apply instance-level parameter group to all nodes. If set to true, + will explicitly restrict users from specifying any nodes, and apply parameter group updates + to all nodes within the instance. + :type apply_all: bool + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + :type location: str + :param instance_id: The logical name of the Memcached instance in the customer project. + :type instance_id: str + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the Google Cloud connection is used. + :type project_id: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + client = self.get_conn() + metadata = metadata or () + name = CloudMemcacheClient.instance_path(project_id, location, instance_id) + + self.log.info("Applying update to instance: %s", instance_id) + result = client.apply_parameters( + name=name, node_ids=node_ids, apply_all=apply_all, retry=retry, timeout=timeout, metadata=metadata + ) + result.result() + self.log.info("Instance updated: %s", instance_id) + + @GoogleBaseHook.fallback_to_default_project_id + def create_instance( + self, + location: str, + instance_id: str, + instance: Union[Dict, cloud_memcache.Instance], + project_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ): + """ + Creates a Memcached instance based on the specified tier and memory size. + + By default, the instance is accessible from the project's `default network + <https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__. + + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + :type location: str + :param instance_id: Required. The logical name of the Memcached instance in the customer project + with the following restrictions: + + - Must contain only lowercase letters, numbers, and hyphens. + - Must start with a letter. + - Must be between 1-40 characters. + - Must end with a number or a letter. + - Must be unique within the customer project / location + :type instance_id: str + :param instance: Required. A Memcached [Instance] resource + + If a dict is provided, it must be of the same form as the protobuf message + :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance` + :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance] + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the GCP connection is used. + :type project_id: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + client = self.get_conn() + metadata = metadata or () + parent = path_template.expand( + "projects/{project}/locations/{location}", project=project_id, location=location + ) + instance_name = CloudMemcacheClient.instance_path(project_id, location, instance_id) + try: + instance = client.get_instance( + name=instance_name, retry=retry, timeout=timeout, metadata=metadata + ) + self.log.info("Instance exists. Skipping creation.") + return instance + except NotFound: + self.log.info("Instance not exists.") + + if isinstance(instance, dict): + instance = cloud_memcache.Instance(instance) + elif not isinstance(instance, cloud_memcache.Instance): + raise AirflowException("instance is not instance of Instance type or python dict") + + self._append_label(instance, "airflow-version", "v" + version.version) + + result = client.create_instance( + parent=parent, + instance_id=instance_id, + resource=instance, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + result.result() + self.log.info("Instance created.") + return client.get_instance(name=instance_name, retry=retry, timeout=timeout, metadata=metadata) + + @GoogleBaseHook.fallback_to_default_project_id + def delete_instance( + self, + location: str, + instance: str, + project_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ): + """ + Deletes a specific Memcached instance. Instance stops serving and data is deleted. + + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + :type location: str + :param instance: The logical name of the Memcached instance in the customer project. + :type instance: str + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the GCP connection is used. + :type project_id: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + client = self.get_conn() + metadata = metadata or () + name = CloudMemcacheClient.instance_path(project_id, location, instance) + self.log.info("Fetching Instance: %s", name) + instance = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata) + + if not instance: + return + + self.log.info("Deleting Instance: %s", name) + result = client.delete_instance(name=name, retry=retry, timeout=timeout, metadata=metadata) + result.result() + self.log.info("Instance deleted: %s", name) + + @GoogleBaseHook.fallback_to_default_project_id + def get_instance( + self, + location: str, + instance: str, + project_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ): + """ + Gets the details of a specific Memcached instance. + + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + :type location: str + :param instance: The logical name of the Memcached instance in the customer project. + :type instance: str + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the GCP connection is used. + :type project_id: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + client = self.get_conn() + metadata = metadata or () + name = CloudMemcacheClient.instance_path(project_id, location, instance) + result = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata or ()) + self.log.info("Fetched Instance: %s", name) + return result + + @GoogleBaseHook.fallback_to_default_project_id + def list_instances( + self, + location: str, + project_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ): + """ + Lists all Memcached instances owned by a project in either the specified location (region) or all + locations. + + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + + If it is specified as ``-`` (wildcard), then all regions available to the project are + queried, and the results are aggregated. + :type location: str + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the GCP connection is used. + :type project_id: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + client = self.get_conn() + metadata = metadata or () + parent = path_template.expand( + "projects/{project}/locations/{location}", project=project_id, location=location + ) + result = client.list_instances(parent=parent, retry=retry, timeout=timeout, metadata=metadata) + self.log.info("Fetched instances") + return result + + @GoogleBaseHook.fallback_to_default_project_id + def update_instance( + self, + update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask], + instance: Union[Dict, cloud_memcache.Instance], + project_id: str, + location: Optional[str] = None, + instance_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ): + """ + Updates the metadata and configuration of a specific Memcached instance. + + :param update_mask: Required. Mask of fields to update. At least one path must be supplied in this + field. The elements of the repeated paths field may only include these fields from ``Instance``: + + - ``displayName`` + + If a dict is provided, it must be of the same form as the protobuf message + :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask` + :type update_mask: + Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask] + :param instance: Required. Update description. Only fields specified in ``update_mask`` are updated. + + If a dict is provided, it must be of the same form as the protobuf message + :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance` + :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance] + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + :type location: str + :param instance_id: The logical name of the Memcached instance in the customer project. + :type instance_id: str + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the Google Cloud connection is used. + :type project_id: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + client = self.get_conn() + metadata = metadata or () + + if isinstance(instance, dict): + instance = cloud_memcache.Instance(instance) + elif not isinstance(instance, cloud_memcache.Instance): + raise AirflowException("instance is not instance of Instance type or python dict") + + if location and instance_id: + name = CloudMemcacheClient.instance_path(project_id, location, instance_id) + instance.name = name + + self.log.info("Updating instances: %s", instance.name) + result = client.update_instance( + update_mask=update_mask, resource=instance, retry=retry, timeout=timeout, metadata=metadata + ) + result.result() + self.log.info("Instance updated: %s", instance.name) + + @GoogleBaseHook.fallback_to_default_project_id + def update_parameters( + self, + update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask], + parameters: Union[Dict, cloud_memcache.MemcacheParameters], + project_id: str, + location: str, + instance_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ): + """ + Updates the defined Memcached Parameters for an existing Instance. This method only stages the + parameters, it must be followed by apply_parameters to apply the parameters to nodes of + the Memcached Instance. + + :param update_mask: Required. Mask of fields to update. + If a dict is provided, it must be of the same form as the protobuf message + :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask` + :type update_mask: + Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask] + :param parameters: The parameters to apply to the instance. + If a dict is provided, it must be of the same form as the protobuf message + :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters` + :type parameters: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters] + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + :type location: str + :param instance_id: The logical name of the Memcached instance in the customer project. + :type instance_id: str + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the Google Cloud connection is used. + :type project_id: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + client = self.get_conn() + metadata = metadata or () + + if isinstance(parameters, dict): + parameters = cloud_memcache.MemcacheParameters(parameters) + elif not isinstance(parameters, cloud_memcache.MemcacheParameters): + raise AirflowException("instance is not instance of MemcacheParameters type or python dict") + + name = CloudMemcacheClient.instance_path(project_id, location, instance_id) + self.log.info("Staging update to instance: %s", instance_id) + result = client.update_parameters( + name=name, + update_mask=update_mask, + parameters=parameters, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + result.result() + self.log.info("Update staged for instance: %s", instance_id) diff --git a/airflow/providers/google/cloud/operators/cloud_memorystore.py b/airflow/providers/google/cloud/operators/cloud_memorystore.py index 0600914..b4ec705 100644 --- a/airflow/providers/google/cloud/operators/cloud_memorystore.py +++ b/airflow/providers/google/cloud/operators/cloud_memorystore.py @@ -19,12 +19,16 @@ from typing import Dict, Optional, Sequence, Tuple, Union from google.api_core.retry import Retry +from google.cloud.memcache_v1beta2.types import cloud_memcache from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest from google.cloud.redis_v1.types import FieldMask, InputConfig, Instance, OutputConfig from google.protobuf.json_format import MessageToDict from airflow.models import BaseOperator -from airflow.providers.google.cloud.hooks.cloud_memorystore import CloudMemorystoreHook +from airflow.providers.google.cloud.hooks.cloud_memorystore import ( + CloudMemorystoreHook, + CloudMemorystoreMemcachedHook, +) from airflow.utils.decorators import apply_defaults @@ -1108,3 +1112,613 @@ class CloudMemorystoreExportAndDeleteInstanceOperator(BaseOperator): timeout=self.timeout, metadata=self.metadata, ) + + +class CloudMemorystoreMemcachedApplyParametersOperator(BaseOperator): + """ + Will update current set of Parameters to the set of specified nodes of the Memcached Instance. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudMemorystoreMemcachedApplyParametersOperator` + + :param node_ids: Nodes to which we should apply the instance-level parameter group. + :type node_ids: Sequence[str] + :param apply_all: Whether to apply instance-level parameter group to all nodes. If set to true, + will explicitly restrict users from specifying any nodes, and apply parameter group updates + to all nodes within the instance. + :type apply_all: bool + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + :type location: str + :param instance_id: The logical name of the Memcached instance in the customer project. + :type instance_id: str + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the Google Cloud connection is used. + :type project_id: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + + template_fields = ( + "node_ids", + "apply_all", + "location", + "instance_id", + "project_id", + "retry", + "timeout", + "metadata", + "gcp_conn_id", + "impersonation_chain", + ) + + @apply_defaults + def __init__( + self, + *, + node_ids: Sequence[str], + apply_all: bool, + location: str, + instance_id: str, + project_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.node_ids = node_ids + self.apply_all = apply_all + self.location = location + self.instance_id = instance_id + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Dict): + hook = CloudMemorystoreMemcachedHook( + gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain + ) + hook.apply_parameters( + node_ids=self.node_ids, + apply_all=self.apply_all, + location=self.location, + instance_id=self.instance_id, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + + +class CloudMemorystoreMemcachedCreateInstanceOperator(BaseOperator): + """ + Creates a Memcached instance based on the specified tier and memory size. + + By default, the instance is accessible from the project's `default network + <https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudMemorystoreMemcachedCreateInstanceOperator` + + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + :type location: str + :param instance_id: Required. The logical name of the Memcached instance in the customer project with the + following restrictions: + + - Must contain only lowercase letters, numbers, and hyphens. + - Must start with a letter. + - Must be between 1-40 characters. + - Must end with a number or a letter. + - Must be unique within the customer project / location + :type instance_id: str + :param instance: Required. A Memcached [Instance] resource + + If a dict is provided, it must be of the same form as the protobuf message + :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance` + :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance] + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the GCP connection is used. + :type project_id: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: str + """ + + template_fields = ( + "location", + "instance_id", + "instance", + "project_id", + "retry", + "timeout", + "metadata", + "gcp_conn_id", + ) + + @apply_defaults + def __init__( + self, + location: str, + instance_id: str, + instance: Union[Dict, cloud_memcache.Instance], + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.location = location + self.instance_id = instance_id + self.instance = instance + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + + def execute(self, context: Dict): + hook = CloudMemorystoreMemcachedHook(gcp_conn_id=self.gcp_conn_id) + result = hook.create_instance( + location=self.location, + instance_id=self.instance_id, + instance=self.instance, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + return hook.proto_message_to_dict(result) + + +class CloudMemorystoreMemcachedDeleteInstanceOperator(BaseOperator): + """ + Deletes a specific Memcached instance. Instance stops serving and data is deleted. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudMemorystoreMemcachedDeleteInstanceOperator` + + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + :type location: str + :param instance: The logical name of the Memcached instance in the customer project. + :type instance: str + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the GCP connection is used. + :type project_id: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: str + """ + + template_fields = ("location", "instance", "project_id", "retry", "timeout", "metadata", "gcp_conn_id") + + @apply_defaults + def __init__( + self, + location: str, + instance: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.location = location + self.instance = instance + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + + def execute(self, context: Dict): + hook = CloudMemorystoreMemcachedHook(gcp_conn_id=self.gcp_conn_id) + hook.delete_instance( + location=self.location, + instance=self.instance, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + + +class CloudMemorystoreMemcachedGetInstanceOperator(BaseOperator): + """ + Gets the details of a specific Memcached instance. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudMemorystoreMemcachedGetInstanceOperator` + + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + :type location: str + :param instance: The logical name of the Memcached instance in the customer project. + :type instance: str + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the Google Cloud connection is used. + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: str + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :type impersonation_chain: Union[str, Sequence[str]] + """ + + template_fields = ( + "location", + "instance", + "project_id", + "retry", + "timeout", + "metadata", + "gcp_conn_id", + "impersonation_chain", + ) + + @apply_defaults + def __init__( + self, + *, + location: str, + instance: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.location = location + self.instance = instance + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Dict): + hook = CloudMemorystoreMemcachedHook( + gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain + ) + result = hook.get_instance( + location=self.location, + instance=self.instance, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + return hook.proto_message_to_dict(result) + + +class CloudMemorystoreMemcachedListInstancesOperator(BaseOperator): + """ + Lists all Memcached instances owned by a project in either the specified location (region) or all + locations. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudMemorystoreMemcachedListInstancesOperator` + + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + If it is specified as ``-`` (wildcard), then all regions available to the project are + queried, and the results are aggregated. + :type location: str + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the Google Cloud connection is used. + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + :param gcp_conn_id: The connection ID to use connecting to Google Cloud. + :type gcp_conn_id: str + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :type impersonation_chain: Union[str, Sequence[str]] + """ + + template_fields = ( + "location", + "project_id", + "retry", + "timeout", + "metadata", + "gcp_conn_id", + "impersonation_chain", + ) + + @apply_defaults + def __init__( + self, + *, + location: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.location = location + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Dict): + hook = CloudMemorystoreMemcachedHook( + gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain + ) + result = hook.list_instances( + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + instances = [hook.proto_message_to_dict(a) for a in result] + return instances + + +class CloudMemorystoreMemcachedUpdateInstanceOperator(BaseOperator): + """ + Updates the metadata and configuration of a specific Memcached instance. + + :param update_mask: Required. Mask of fields to update. At least one path must be supplied in this field. + The elements of the repeated paths field may only include these fields from ``Instance``: + + - ``displayName`` + + If a dict is provided, it must be of the same form as the protobuf message + :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMas` + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudMemorystoreMemcachedUpdateInstanceOperator` + + :type update_mask: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask] + :param instance: Required. Update description. Only fields specified in update_mask are updated. + + If a dict is provided, it must be of the same form as the protobuf message + :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance` + :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance] + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + :type location: str + :param instance_id: The logical name of the Memcached instance in the customer project. + :type instance_id: str + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the Google Cloud connection is used. + :type project_id: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + :param gcp_conn_id: The connection ID to use connecting to Google Cloud. + :type gcp_conn_id: str + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :type impersonation_chain: Union[str, Sequence[str]] + """ + + template_fields = ( + "update_mask", + "instance", + "location", + "instance_id", + "project_id", + "retry", + "timeout", + "metadata", + "gcp_conn_id", + "impersonation_chain", + ) + + @apply_defaults + def __init__( + self, + *, + update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask], + instance: Union[Dict, cloud_memcache.Instance], + location: Optional[str] = None, + instance_id: Optional[str] = None, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.update_mask = update_mask + self.instance = instance + self.location = location + self.instance_id = instance_id + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Dict): + hook = CloudMemorystoreMemcachedHook( + gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain + ) + hook.update_instance( + update_mask=self.update_mask, + instance=self.instance, + location=self.location, + instance_id=self.instance_id, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + + +class CloudMemorystoreMemcachedUpdateParametersOperator(BaseOperator): + """ + Updates the defined Memcached Parameters for an existing Instance. This method only stages the + parameters, it must be followed by apply_parameters to apply the parameters to nodes of + the Memcached Instance. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudMemorystoreMemcachedApplyParametersOperator` + + :param update_mask: Required. Mask of fields to update. + If a dict is provided, it must be of the same form as the protobuf message + :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask` + :type update_mask: + Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask] + :param parameters: The parameters to apply to the instance. + If a dict is provided, it must be of the same form as the protobuf message + :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters` + :type parameters: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters] + :param location: The location of the Cloud Memorystore instance (for example europe-west1) + :type location: str + :param instance_id: The logical name of the Memcached instance in the customer project. + :type instance_id: str + :param project_id: Project ID of the project that contains the instance. If set + to None or missing, the default project_id from the Google Cloud connection is used. + :type project_id: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + + template_fields = ( + "update_mask", + "parameters", + "location", + "instance_id", + "project_id", + "retry", + "timeout", + "metadata", + "gcp_conn_id", + "impersonation_chain", + ) + + @apply_defaults + def __init__( + self, + *, + update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask], + parameters: Union[Dict, cloud_memcache.MemcacheParameters], + location: str, + instance_id: str, + project_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.update_mask = update_mask + self.parameters = parameters + self.location = location + self.instance_id = instance_id + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Dict): + hook = CloudMemorystoreMemcachedHook( + gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain + ) + hook.update_parameters( + update_mask=self.update_mask, + parameters=self.parameters, + location=self.location, + instance_id=self.instance_id, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) diff --git a/docs/howto/operator/google/cloud/cloud_memorystore_memcached.rst b/docs/howto/operator/google/cloud/cloud_memorystore_memcached.rst new file mode 100644 index 0000000..6483c75 --- /dev/null +++ b/docs/howto/operator/google/cloud/cloud_memorystore_memcached.rst @@ -0,0 +1,158 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +Google Cloud Memorystore Memcached Operators +============================================ + +The `Cloud Memorystore for Memcached <https://cloud.google.com/memorystore/docs/memcached/>`__ is a fully managed +Memcached service for Google Cloud. Applications running on Google Cloud can achieve extreme performance by +leveraging the highly scalable, available, secure Memcached service without the burden of managing complex +Memcached deployments. + +.. contents:: + :depth: 1 + :local: + +Prerequisite Tasks +^^^^^^^^^^^^^^^^^^ + +.. include:: /howto/operator/google/_partials/prerequisite_tasks.rst + + +.. _howto/operator:CloudMemorystoreMemcachedInstance: + +Instance +^^^^^^^^ + +Operators uses a :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance` for representing instance. +The object can be presented as a compatible dictionary also. + +Here is an example of instance + +.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py + :language: python + :start-after: [START howto_operator_memcached_instance] + :end-before: [END howto_operator_memcached_instance] + + +.. _howto/operator:CloudMemorystoreMemcachedCreateInstanceOperator: + +Create instance +^^^^^^^^^^^^^^^ + +Create a instance is performed with the +:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedCreateInstanceOperator` +operator. + +.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_create_instance_memcached] + :end-before: [END howto_operator_create_instance_memcached] + + +.. _howto/operator:CloudMemorystoreMemcachedDeleteInstanceOperator: + +Delete instance +^^^^^^^^^^^^^^^ + +Delete an instance is performed with the +:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedDeleteInstanceOperator` +operator. + +.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_delete_instance_memcached] + :end-before: [END howto_operator_delete_instance_memcached] + + +.. _howto/operator:CloudMemorystoreMemcachedGetInstanceOperator: + +Get instance +^^^^^^^^^^^^ + +Get an instance is performed with the +:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedGetInstanceOperator` +operator. + +.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_get_instance_memcached] + :end-before: [END howto_operator_get_instance_memcached] + + +.. _howto/operator:CloudMemorystoreMemcachedListInstancesOperator: + +List instances +^^^^^^^^^^^^^^ + +List instances is performed with the +:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedListInstancesOperator` +operator. + +.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_list_instances_memcached] + :end-before: [END howto_operator_list_instances_memcached] + + +.. _howto/operator:CloudMemorystoreMemcachedUpdateInstanceOperator: + +Update instance +^^^^^^^^^^^^^^^ + +Updating an instance is performed with the +:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedUpdateInstanceOperator` +operator. + +.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_update_instance_memcached] + :end-before: [END howto_operator_update_instance_memcached] + + +.. _howto/operator:CloudMemorystoreMemcachedApplyParametersOperator: + +Update and apply parameters to an instance +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +To update and apply Memcached parameters to an instance use +:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedUpdateParametersOperator` +and +:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedApplyParametersOperator` +operator. + +.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_update_and_apply_parameters_memcached] + :end-before: [END howto_operator_update_and_apply_parameters_memcached] + + +Reference +^^^^^^^^^ + +For further information, look at: + +* `Client Library Documentation <https://googleapis.dev/python/memcache/latest/index.html>`__ +* `Product Documentation <https://cloud.google.com/memorystore/docs/memcached/>`__ diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst index 9b17642..58296cd 100644 --- a/docs/operators-and-hooks-ref.rst +++ b/docs/operators-and-hooks-ref.rst @@ -845,12 +845,18 @@ These integrations allow you to perform various operations within the Google Clo - :mod:`airflow.providers.google.cloud.operators.mlengine` - - * - `Cloud Memorystore <https://cloud.google.com/memorystore/>`__ + * - `Cloud Memorystore Redis <https://cloud.google.com/memorystore/>`__ - :doc:`How to use <howto/operator/google/cloud/cloud_memorystore>` - :mod:`airflow.providers.google.cloud.hooks.cloud_memorystore` - :mod:`airflow.providers.google.cloud.operators.cloud_memorystore` - + * - `Cloud Memorystore Memcached <https://cloud.google.com/memorystore/>`__ + - :doc:`How to use <howto/operator/google/cloud/cloud_memorystore_memcached>` + - :mod:`airflow.providers.google.cloud.hooks.cloud_memorystore` + - :mod:`airflow.providers.google.cloud.operators.cloud_memorystore` + - + * - `Natural Language <https://cloud.google.com/natural-language/>`__ - :doc:`How to use <howto/operator/google/cloud/natural_language>` - :mod:`airflow.providers.google.cloud.hooks.natural_language` diff --git a/setup.py b/setup.py index ef3ee97..c6952b5 100644 --- a/setup.py +++ b/setup.py @@ -268,6 +268,7 @@ google = [ 'google-cloud-kms>=1.2.1,<2.0.0', 'google-cloud-language>=1.1.1,<2.0.0', 'google-cloud-logging>=1.14.0,<2.0.0', + 'google-cloud-memcache>=0.2.0', 'google-cloud-monitoring>=0.34.0,<2.0.0', 'google-cloud-pubsub>=1.0.0,<2.0.0', 'google-cloud-redis>=0.3.0,<2.0.0', diff --git a/tests/providers/google/cloud/hooks/test_cloud_memorystore.py b/tests/providers/google/cloud/hooks/test_cloud_memorystore.py index fe3a1f8..8f92e79 100644 --- a/tests/providers/google/cloud/hooks/test_cloud_memorystore.py +++ b/tests/providers/google/cloud/hooks/test_cloud_memorystore.py @@ -20,12 +20,16 @@ from unittest import TestCase, mock from google.api_core.retry import Retry from google.cloud.exceptions import NotFound +from google.cloud.memcache_v1beta2.types import cloud_memcache from google.cloud.redis_v1.types import Instance from mock import PropertyMock from airflow import version from airflow.exceptions import AirflowException -from airflow.providers.google.cloud.hooks.cloud_memorystore import CloudMemorystoreHook +from airflow.providers.google.cloud.hooks.cloud_memorystore import ( + CloudMemorystoreHook, + CloudMemorystoreMemcachedHook, +) from tests.providers.google.cloud.utils.base_gcp_mock import ( GCP_PROJECT_ID_HOOK_UNIT_TEST, mock_base_gcp_hook_default_project_id, @@ -42,6 +46,7 @@ TEST_TIMEOUT = 10 # type: float TEST_METADATA = [("KEY", "VALUE")] # type: Sequence[Tuple[str, str]] TEST_PAGE_SIZE = 100 # type: int TEST_UPDATE_MASK = {"paths": ["memory_size_gb"]} # type: Dict +TEST_UPDATE_MASK_MEMCACHED = {"displayName": "updated name"} # type: Dict TEST_PARENT = "projects/test-project-id/locations/test-location" # type: str TEST_NAME = "projects/test-project-id/locations/test-location/instances/test-instance-id" # type: str TEST_PARENT_DEFAULT_PROJECT_ID = "projects/{}/locations/test-location".format( @@ -429,3 +434,184 @@ class TestCloudMemorystoreWithoutDefaultProjectIdHook(TestCase): timeout=TEST_TIMEOUT, metadata=TEST_METADATA, ) + + +class TestCloudMemorystoreMemcachedWithDefaultProjectIdHook(TestCase): + def setUp( + self, + ): + with mock.patch( + "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.__init__", + new=mock_base_gcp_hook_default_project_id, + ): + self.hook = CloudMemorystoreMemcachedHook(gcp_conn_id="test") + + @mock.patch( + 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id', + new_callable=PropertyMock, + return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST, + ) + @mock.patch( + "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn" + ) + def test_create_instance_when_exists(self, mock_get_conn, mock_project_id): + mock_get_conn.return_value.get_instance.return_value = cloud_memcache.Instance(name=TEST_NAME) + result = self.hook.create_instance( # pylint: disable=no-value-for-parameter + location=TEST_LOCATION, + instance_id=TEST_INSTANCE_ID, + instance=cloud_memcache.Instance(name=TEST_NAME), + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_conn.return_value.get_instance.assert_called_once_with( + name=TEST_NAME_DEFAULT_PROJECT_ID, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA + ) + self.assertEqual(cloud_memcache.Instance(name=TEST_NAME), result) + + @mock.patch( + 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id', + new_callable=PropertyMock, + return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST, + ) + @mock.patch( + "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn" + ) + def test_create_instance_when_not_exists(self, mock_get_conn, mock_project_id): + mock_get_conn.return_value.get_instance.side_effect = [ + NotFound("Instance not found"), + cloud_memcache.Instance(name=TEST_NAME), + ] + mock_get_conn.return_value.create_instance.return_value.result.return_value = cloud_memcache.Instance( + name=TEST_NAME + ) + result = self.hook.create_instance( # pylint: disable=no-value-for-parameter + location=TEST_LOCATION, + instance_id=TEST_INSTANCE_ID, + instance=cloud_memcache.Instance(name=TEST_NAME), + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_conn.return_value.get_instance.has_calls( + [ + mock.call(name=TEST_NAME, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA), + mock.call(name=TEST_NAME, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA), + ] + ) + mock_get_conn.return_value.create_instance.assert_called_once_with( + resource=cloud_memcache.Instance( + name=TEST_NAME, + labels={"airflow-version": "v" + version.version.replace(".", "-").replace("+", "-")}, + ), + instance_id=TEST_INSTANCE_ID, + metadata=TEST_METADATA, + parent=TEST_PARENT_DEFAULT_PROJECT_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + ) + self.assertEqual(cloud_memcache.Instance(name=TEST_NAME), result) + + @mock.patch( + 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id', + new_callable=PropertyMock, + return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST, + ) + @mock.patch( + "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn" + ) + def test_delete_instance(self, mock_get_conn, mock_project_id): + self.hook.delete_instance( # pylint: disable=no-value-for-parameter + location=TEST_LOCATION, + instance=TEST_INSTANCE_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_conn.return_value.delete_instance.assert_called_once_with( + name=TEST_NAME_DEFAULT_PROJECT_ID, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA + ) + + @mock.patch( + 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id', + new_callable=PropertyMock, + return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST, + ) + @mock.patch( + "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn" + ) + def test_get_instance(self, mock_get_conn, mock_project_id): + self.hook.get_instance( # pylint: disable=no-value-for-parameter + location=TEST_LOCATION, + instance=TEST_INSTANCE_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_conn.return_value.get_instance.assert_called_once_with( + name=TEST_NAME_DEFAULT_PROJECT_ID, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA + ) + + @mock.patch( + 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id', + new_callable=PropertyMock, + return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST, + ) + @mock.patch( + "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn" + ) + def test_list_instances(self, mock_get_conn, mock_project_id): + self.hook.list_instances( # pylint: disable=no-value-for-parameter + location=TEST_LOCATION, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_conn.return_value.list_instances.assert_called_once_with( + parent=TEST_PARENT_DEFAULT_PROJECT_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + @mock.patch( + 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id', + new_callable=PropertyMock, + return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST, + ) + @mock.patch( + "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn" + ) + def test_update_instance(self, mock_get_conn, mock_project_id): + self.hook.update_instance( # pylint: disable=no-value-for-parameter + update_mask=TEST_UPDATE_MASK_MEMCACHED, + instance=cloud_memcache.Instance(name=TEST_NAME), + location=TEST_LOCATION, + instance_id=TEST_INSTANCE_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_conn.return_value.update_instance.assert_called_once_with( + update_mask=TEST_UPDATE_MASK_MEMCACHED, + resource=cloud_memcache.Instance(name=TEST_NAME_DEFAULT_PROJECT_ID), + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + def test_proto_functions(self): + instance_dict = { + 'name': 'test_name', + 'node_count': 1, + 'node_config': {'cpu_count': 1, 'memory_size_mb': 1024}, + } + instance = cloud_memcache.Instance(instance_dict) + instance_dict_result = self.hook.proto_message_to_dict(instance) + self.assertEqual(instance_dict_result["name"], instance_dict["name"]) + self.assertEqual( + instance_dict_result["nodeConfig"]["cpuCount"], instance_dict["node_config"]["cpu_count"] + ) + self.assertEqual( + instance_dict_result["nodeConfig"]["memorySizeMb"], instance_dict["node_config"]["memory_size_mb"] + ) diff --git a/tests/providers/google/cloud/operators/test_cloud_memorystore.py b/tests/providers/google/cloud/operators/test_cloud_memorystore.py index 384a572..85b17e5 100644 --- a/tests/providers/google/cloud/operators/test_cloud_memorystore.py +++ b/tests/providers/google/cloud/operators/test_cloud_memorystore.py @@ -33,6 +33,11 @@ from airflow.providers.google.cloud.operators.cloud_memorystore import ( CloudMemorystoreListInstancesOperator, CloudMemorystoreScaleInstanceOperator, CloudMemorystoreUpdateInstanceOperator, + CloudMemorystoreMemcachedCreateInstanceOperator, + CloudMemorystoreMemcachedDeleteInstanceOperator, + CloudMemorystoreMemcachedGetInstanceOperator, + CloudMemorystoreMemcachedListInstancesOperator, + CloudMemorystoreMemcachedUpdateInstanceOperator, ) TEST_GCP_CONN_ID = "test-gcp-conn-id" @@ -52,6 +57,7 @@ TEST_DATA_PROTECTION_MODE = FailoverInstanceRequest.DataProtectionMode.LIMITED_D TEST_INPUT_CONFIG = {"gcs_source": {"uri": "gs://test-bucket/file.rdb"}} # type: Dict TEST_PAGE_SIZE = 100 # type: int TEST_UPDATE_MASK = {"paths": ["memory_size_gb"]} # TODO: Fill missing value +TEST_UPDATE_MASK_MEMCACHED = {"displayName": "memcached instance"} TEST_PARENT = "test-parent" TEST_NAME = "test-name" @@ -375,3 +381,144 @@ class TestCloudMemorystoreCreateInstanceAndImportOperatorOperator(TestCase): ), ] ) + + +class TestCloudMemorystoreMemcachedCreateInstanceOperator(TestCase): + @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook") + def test_assert_valid_hook_call(self, mock_hook): + task = CloudMemorystoreMemcachedCreateInstanceOperator( + task_id=TEST_TASK_ID, + location=TEST_LOCATION, + instance_id=TEST_INSTANCE_ID, + instance=TEST_INSTANCE, + project_id=TEST_PROJECT_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + gcp_conn_id=TEST_GCP_CONN_ID, + ) + task.execute(mock.MagicMock()) + mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID) + mock_hook.return_value.create_instance.assert_called_once_with( + location=TEST_LOCATION, + instance_id=TEST_INSTANCE_ID, + instance=TEST_INSTANCE, + project_id=TEST_PROJECT_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + +class TestCloudMemorystoreMemcachedDeleteInstanceOperator(TestCase): + @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook") + def test_assert_valid_hook_call(self, mock_hook): + task = CloudMemorystoreMemcachedDeleteInstanceOperator( + task_id=TEST_TASK_ID, + location=TEST_LOCATION, + instance=TEST_INSTANCE_NAME, + project_id=TEST_PROJECT_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + gcp_conn_id=TEST_GCP_CONN_ID, + ) + task.execute(mock.MagicMock()) + mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID) + mock_hook.return_value.delete_instance.assert_called_once_with( + location=TEST_LOCATION, + instance=TEST_INSTANCE_NAME, + project_id=TEST_PROJECT_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + +class TestCloudMemorystoreMemcachedGetInstanceOperator(TestCase): + @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook") + def test_assert_valid_hook_call(self, mock_hook): + task = CloudMemorystoreMemcachedGetInstanceOperator( + task_id=TEST_TASK_ID, + location=TEST_LOCATION, + instance=TEST_INSTANCE_NAME, + project_id=TEST_PROJECT_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + gcp_conn_id=TEST_GCP_CONN_ID, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + task.execute(mock.MagicMock()) + mock_hook.assert_called_once_with( + gcp_conn_id=TEST_GCP_CONN_ID, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + mock_hook.return_value.get_instance.assert_called_once_with( + location=TEST_LOCATION, + instance=TEST_INSTANCE_NAME, + project_id=TEST_PROJECT_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + +class TestCloudMemorystoreMemcachedListInstancesOperator(TestCase): + @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook") + def test_assert_valid_hook_call(self, mock_hook): + task = CloudMemorystoreMemcachedListInstancesOperator( + task_id=TEST_TASK_ID, + location=TEST_LOCATION, + project_id=TEST_PROJECT_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + gcp_conn_id=TEST_GCP_CONN_ID, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + task.execute(mock.MagicMock()) + mock_hook.assert_called_once_with( + gcp_conn_id=TEST_GCP_CONN_ID, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + mock_hook.return_value.list_instances.assert_called_once_with( + location=TEST_LOCATION, + project_id=TEST_PROJECT_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + +class TestCloudMemorystoreMemcachedUpdateInstanceOperator(TestCase): + @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook") + def test_assert_valid_hook_call(self, mock_hook): + task = CloudMemorystoreMemcachedUpdateInstanceOperator( + task_id=TEST_TASK_ID, + update_mask=TEST_UPDATE_MASK_MEMCACHED, + instance=TEST_INSTANCE, + location=TEST_LOCATION, + instance_id=TEST_INSTANCE_ID, + project_id=TEST_PROJECT_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + gcp_conn_id=TEST_GCP_CONN_ID, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + task.execute(mock.MagicMock()) + mock_hook.assert_called_once_with( + gcp_conn_id=TEST_GCP_CONN_ID, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + mock_hook.return_value.update_instance.assert_called_once_with( + update_mask=TEST_UPDATE_MASK_MEMCACHED, + instance=TEST_INSTANCE, + location=TEST_LOCATION, + instance_id=TEST_INSTANCE_ID, + project_id=TEST_PROJECT_ID, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) diff --git a/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py b/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py index 4b42143..6f56f18 100644 --- a/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py +++ b/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py @@ -45,8 +45,12 @@ class CloudMemorystoreSystemTest(GoogleSystemTest): self.create_gcs_bucket(GCP_BUCKET_NAME, location="europe-north1") @provide_gcp_context(GCP_MEMORYSTORE) - def test_run_example_dag(self): - self.run_dag('gcp_cloud_memorystore', CLOUD_DAG_FOLDER) + def test_run_example_dag_memorystore_redis(self): + self.run_dag('gcp_cloud_memorystore_redis', CLOUD_DAG_FOLDER) + + @provide_gcp_context(GCP_MEMORYSTORE) + def test_run_example_dag_memorystore_memcached(self): + self.run_dag('gcp_cloud_memorystore_memcached', CLOUD_DAG_FOLDER) @provide_gcp_context(GCP_MEMORYSTORE) def tearDown(self):