This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new 810c15e Fix and improve GCP BigTable hook and system test (#13896) 810c15e is described below commit 810c15ed85d7bcde8d5b8bc44e1cbd4859e29d2e Author: Tobiasz Kędzierski <tobiasz.kedzier...@polidea.com> AuthorDate: Wed Jan 27 12:25:40 2021 +0100 Fix and improve GCP BigTable hook and system test (#13896) Improve environment variables in GCP BigTable system test. It will help to parametrize system tests. --- .../google/cloud/example_dags/example_bigtable.py | 32 ++++++------ airflow/providers/google/cloud/hooks/bigtable.py | 9 +++- .../providers/google/cloud/hooks/test_bigtable.py | 58 ++++++++++++++++++++-- .../google/cloud/operators/test_bigtable_system.py | 7 +-- 4 files changed, 81 insertions(+), 25 deletions(-) diff --git a/airflow/providers/google/cloud/example_dags/example_bigtable.py b/airflow/providers/google/cloud/example_dags/example_bigtable.py index ce852ed..fc62cdf 100644 --- a/airflow/providers/google/cloud/example_dags/example_bigtable.py +++ b/airflow/providers/google/cloud/example_dags/example_bigtable.py @@ -60,22 +60,22 @@ from airflow.providers.google.cloud.sensors.bigtable import BigtableTableReplica from airflow.utils.dates import days_ago GCP_PROJECT_ID = getenv('GCP_PROJECT_ID', 'example-project') -CBT_INSTANCE_ID = getenv('CBT_INSTANCE_ID', 'some-instance-id') -CBT_INSTANCE_DISPLAY_NAME = getenv('CBT_INSTANCE_DISPLAY_NAME', 'Human-readable name') +CBT_INSTANCE_ID = getenv('GCP_BIG_TABLE_INSTANCE_ID', 'some-instance-id') +CBT_INSTANCE_DISPLAY_NAME = getenv('GCP_BIG_TABLE_INSTANCE_DISPLAY_NAME', 'Human-readable name') CBT_INSTANCE_DISPLAY_NAME_UPDATED = getenv( - "CBT_INSTANCE_DISPLAY_NAME_UPDATED", "Human-readable name - updated" + "GCP_BIG_TABLE_INSTANCE_DISPLAY_NAME_UPDATED", f"{CBT_INSTANCE_DISPLAY_NAME} - updated" ) -CBT_INSTANCE_TYPE = getenv('CBT_INSTANCE_TYPE', '2') -CBT_INSTANCE_TYPE_PROD = getenv('CBT_INSTANCE_TYPE_PROD', '1') -CBT_INSTANCE_LABELS = getenv('CBT_INSTANCE_LABELS', '{}') -CBT_INSTANCE_LABELS_UPDATED = getenv('CBT_INSTANCE_LABELS', '{"env": "prod"}') -CBT_CLUSTER_ID = getenv('CBT_CLUSTER_ID', 'some-cluster-id') -CBT_CLUSTER_ZONE = getenv('CBT_CLUSTER_ZONE', 'europe-west1-b') -CBT_CLUSTER_NODES = getenv('CBT_CLUSTER_NODES', '3') -CBT_CLUSTER_NODES_UPDATED = getenv('CBT_CLUSTER_NODES_UPDATED', '5') -CBT_CLUSTER_STORAGE_TYPE = getenv('CBT_CLUSTER_STORAGE_TYPE', '2') -CBT_TABLE_ID = getenv('CBT_TABLE_ID', 'some-table-id') -CBT_POKE_INTERVAL = getenv('CBT_POKE_INTERVAL', '60') +CBT_INSTANCE_TYPE = getenv('GCP_BIG_TABLE_INSTANCE_TYPE', '2') +CBT_INSTANCE_TYPE_PROD = getenv('GCP_BIG_TABLE_INSTANCE_TYPE_PROD', '1') +CBT_INSTANCE_LABELS = getenv('GCP_BIG_TABLE_INSTANCE_LABELS', '{}') +CBT_INSTANCE_LABELS_UPDATED = getenv('GCP_BIG_TABLE_INSTANCE_LABELS_UPDATED', '{"env": "prod"}') +CBT_CLUSTER_ID = getenv('GCP_BIG_TABLE_CLUSTER_ID', 'some-cluster-id') +CBT_CLUSTER_ZONE = getenv('GCP_BIG_TABLE_CLUSTER_ZONE', 'europe-west1-b') +CBT_CLUSTER_NODES = getenv('GCP_BIG_TABLE_CLUSTER_NODES', '3') +CBT_CLUSTER_NODES_UPDATED = getenv('GCP_BIG_TABLE_CLUSTER_NODES_UPDATED', '5') +CBT_CLUSTER_STORAGE_TYPE = getenv('GCP_BIG_TABLE_CLUSTER_STORAGE_TYPE', '2') +CBT_TABLE_ID = getenv('GCP_BIG_TABLE_TABLE_ID', 'some-table-id') +CBT_POKE_INTERVAL = getenv('GCP_BIG_TABLE_POKE_INTERVAL', '60') with models.DAG( @@ -93,8 +93,8 @@ with models.DAG( instance_display_name=CBT_INSTANCE_DISPLAY_NAME, instance_type=int(CBT_INSTANCE_TYPE), instance_labels=json.loads(CBT_INSTANCE_LABELS), - cluster_nodes=int(CBT_CLUSTER_NODES), - cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE, + cluster_nodes=None, + cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE), task_id='create_instance_task', ) create_instance_task2 = BigtableCreateInstanceOperator( diff --git a/airflow/providers/google/cloud/hooks/bigtable.py b/airflow/providers/google/cloud/hooks/bigtable.py index c5a2fa1..60e309d 100644 --- a/airflow/providers/google/cloud/hooks/bigtable.py +++ b/airflow/providers/google/cloud/hooks/bigtable.py @@ -169,7 +169,14 @@ class BigtableHook(GoogleBaseHook): instance_labels, ) - clusters = [instance.cluster(main_cluster_id, main_cluster_zone, cluster_nodes, cluster_storage_type)] + cluster_kwargs = dict( + cluster_id=main_cluster_id, + location_id=main_cluster_zone, + default_storage_type=cluster_storage_type, + ) + if instance_type != enums.Instance.Type.DEVELOPMENT and cluster_nodes: + cluster_kwargs["serve_nodes"] = cluster_nodes + clusters = [instance.cluster(**cluster_kwargs)] if replica_cluster_id and replica_cluster_zone: warnings.warn( "The replica_cluster_id and replica_cluster_zone parameter have been deprecated." diff --git a/tests/providers/google/cloud/hooks/test_bigtable.py b/tests/providers/google/cloud/hooks/test_bigtable.py index a452c48..16437fe 100644 --- a/tests/providers/google/cloud/hooks/test_bigtable.py +++ b/tests/providers/google/cloud/hooks/test_bigtable.py @@ -309,7 +309,7 @@ class TestBigtableHookDefaultProjectId(unittest.TestCase): @mock.patch('google.cloud.bigtable.instance.Instance.cluster') @mock.patch('google.cloud.bigtable.instance.Instance.create') @mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client') - def test_create_instance_with_one_replica_cluster( + def test_create_instance_with_one_replica_cluster_production( self, get_client, instance_create, cluster, mock_project_id ): operation = mock.Mock() @@ -325,10 +325,57 @@ class TestBigtableHookDefaultProjectId(unittest.TestCase): cluster_nodes=1, cluster_storage_type=enums.StorageType.SSD, project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST, + instance_type=enums.Instance.Type.PRODUCTION, ) cluster.assert_has_calls( [ - unittest.mock.call(CBT_CLUSTER, CBT_ZONE, 1, enums.StorageType.SSD), + unittest.mock.call( + cluster_id=CBT_CLUSTER, + location_id=CBT_ZONE, + serve_nodes=1, + default_storage_type=enums.StorageType.SSD, + ), + unittest.mock.call( + CBT_REPLICA_CLUSTER_ID, CBT_REPLICA_CLUSTER_ZONE, 1, enums.StorageType.SSD + ), + ], + any_order=True, + ) + get_client.assert_called_once_with(project_id='example-project') + instance_create.assert_called_once_with(clusters=mock.ANY) + assert res.instance_id == 'instance' + + @mock.patch( + 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id', + new_callable=PropertyMock, + return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST, + ) + @mock.patch('google.cloud.bigtable.instance.Instance.cluster') + @mock.patch('google.cloud.bigtable.instance.Instance.create') + @mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client') + def test_create_instance_with_one_replica_cluster_development( + self, get_client, instance_create, cluster, mock_project_id + ): + operation = mock.Mock() + operation.result_return_value = Instance(instance_id=CBT_INSTANCE, client=get_client) + instance_create.return_value = operation + + res = self.bigtable_hook_default_project_id.create_instance( + instance_id=CBT_INSTANCE, + main_cluster_id=CBT_CLUSTER, + main_cluster_zone=CBT_ZONE, + replica_cluster_id=CBT_REPLICA_CLUSTER_ID, + replica_cluster_zone=CBT_REPLICA_CLUSTER_ZONE, + cluster_nodes=1, + cluster_storage_type=enums.StorageType.SSD, + project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST, + instance_type=enums.Instance.Type.DEVELOPMENT, + ) + cluster.assert_has_calls( + [ + unittest.mock.call( + cluster_id=CBT_CLUSTER, location_id=CBT_ZONE, default_storage_type=enums.StorageType.SSD + ), unittest.mock.call( CBT_REPLICA_CLUSTER_ID, CBT_REPLICA_CLUSTER_ZONE, 1, enums.StorageType.SSD ), @@ -365,7 +412,12 @@ class TestBigtableHookDefaultProjectId(unittest.TestCase): ) cluster.assert_has_calls( [ - unittest.mock.call(CBT_CLUSTER, CBT_ZONE, 1, enums.StorageType.SSD), + unittest.mock.call( + cluster_id=CBT_CLUSTER, + location_id=CBT_ZONE, + serve_nodes=1, + default_storage_type=enums.StorageType.SSD, + ), unittest.mock.call('replica-1', 'us-west1-a', 1, enums.StorageType.SSD), unittest.mock.call('replica-2', 'us-central1-f', 1, enums.StorageType.SSD), unittest.mock.call('replica-3', 'us-east1-d', 1, enums.StorageType.SSD), diff --git a/tests/providers/google/cloud/operators/test_bigtable_system.py b/tests/providers/google/cloud/operators/test_bigtable_system.py index b987731..ea83493 100644 --- a/tests/providers/google/cloud/operators/test_bigtable_system.py +++ b/tests/providers/google/cloud/operators/test_bigtable_system.py @@ -15,16 +15,13 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import os import pytest +from airflow.providers.google.cloud.example_dags.example_bigtable import CBT_INSTANCE_ID, GCP_PROJECT_ID from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGTABLE_KEY from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context -GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') -CBT_INSTANCE = os.environ.get('CBT_INSTANCE_ID', 'testinstance') - @pytest.mark.backend("mysql", "postgres") @pytest.mark.credential_file(GCP_BIGTABLE_KEY) @@ -45,7 +42,7 @@ class BigTableExampleDagsSystemTest(GoogleSystemTest): '--verbosity=none', 'instances', 'delete', - CBT_INSTANCE, + CBT_INSTANCE_ID, ], key=GCP_BIGTABLE_KEY, )