This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 6f57ee1256 Add capability of passing a bucket to `example_emr` system test (#36192) 6f57ee1256 is described below commit 6f57ee12564759892af1152606af5373d2b709c0 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Tue Dec 12 16:11:36 2023 -0500 Add capability of passing a bucket to `example_emr` system test (#36192) --- tests/system/providers/amazon/aws/example_emr.py | 66 ++++++++++++++-------- .../system/providers/amazon/aws/utils/__init__.py | 23 ++++++-- 2 files changed, 61 insertions(+), 28 deletions(-) diff --git a/tests/system/providers/amazon/aws/example_emr.py b/tests/system/providers/amazon/aws/example_emr.py index 32f10f6f1b..e131b18f3c 100644 --- a/tests/system/providers/amazon/aws/example_emr.py +++ b/tests/system/providers/amazon/aws/example_emr.py @@ -42,6 +42,7 @@ from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContex DAG_ID = "example_emr" CONFIG_NAME = "EMR Runtime Role Security Configuration" EXECUTION_ROLE_ARN_KEY = "EXECUTION_ROLE_ARN" +BUCKET_NAME_KEY = "BUCKET_NAME" SECURITY_CONFIGURATION = { "AuthorizationConfiguration": { @@ -123,7 +124,12 @@ def get_step_id(step_ids: list): return step_ids[0] -sys_test_context_task = SystemTestContextBuilder().add_variable(EXECUTION_ROLE_ARN_KEY).build() +sys_test_context_task = ( + SystemTestContextBuilder() + .add_variable(EXECUTION_ROLE_ARN_KEY) + .add_variable(BUCKET_NAME_KEY, optional=True) + .build() +) with DAG( dag_id=DAG_ID, @@ -137,14 +143,12 @@ with DAG( env_id = test_context[ENV_ID_KEY] config_name = f"{CONFIG_NAME}-{env_id}" execution_role_arn = test_context[EXECUTION_ROLE_ARN_KEY] - s3_bucket = f"{env_id}-emr-bucket" + s3_bucket = test_context[BUCKET_NAME_KEY] or f"{env_id}-emr-bucket" JOB_FLOW_OVERRIDES["LogUri"] = f"s3://{s3_bucket}/" JOB_FLOW_OVERRIDES["SecurityConfiguration"] = config_name JOB_FLOW_OVERRIDES["Instances"]["InstanceGroups"][0]["CustomAmiId"] = get_ami_id() - create_s3_bucket = S3CreateBucketOperator(task_id="create_s3_bucket", bucket_name=s3_bucket) - create_security_configuration = configure_security_config(config_name) # [START howto_operator_emr_create_job_flow] @@ -196,28 +200,42 @@ with DAG( delete_security_configuration = delete_security_config(config_name) - delete_s3_bucket = S3DeleteBucketOperator( - task_id="delete_s3_bucket", - bucket_name=s3_bucket, - force_delete=True, - trigger_rule=TriggerRule.ALL_DONE, - ) + # There are two options: + # - Pass the bucket name as an argument to the system test. This bucket will then be used to store + # the EMR-related logs. + # - The test itself creates and delete the S3 bucket needed for this test. + create_s3_bucket: S3CreateBucketOperator | None = None + delete_s3_bucket: S3DeleteBucketOperator | None = None + if not test_context[BUCKET_NAME_KEY]: + create_s3_bucket = S3CreateBucketOperator(task_id="create_s3_bucket", bucket_name=s3_bucket) + delete_s3_bucket = S3DeleteBucketOperator( + task_id="delete_s3_bucket", + bucket_name=s3_bucket, + force_delete=True, + trigger_rule=TriggerRule.ALL_DONE, + ) chain( - # TEST SETUP - test_context, - create_s3_bucket, - create_security_configuration, - # TEST BODY - create_job_flow, - modify_cluster, - add_steps, - wait_for_step, - # TEST TEARDOWN - remove_cluster, - check_job_flow, - delete_security_configuration, - delete_s3_bucket, + *[ + task + for task in [ + # TEST SETUP + test_context, + create_s3_bucket, + create_security_configuration, + # TEST BODY + create_job_flow, + modify_cluster, + add_steps, + wait_for_step, + # TEST TEARDOWN + remove_cluster, + check_job_flow, + delete_security_configuration, + delete_s3_bucket, + ] + if task is not None + ] ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/amazon/aws/utils/__init__.py b/tests/system/providers/amazon/aws/utils/__init__.py index d86abd0486..1bdcbf656f 100644 --- a/tests/system/providers/amazon/aws/utils/__init__.py +++ b/tests/system/providers/amazon/aws/utils/__init__.py @@ -131,6 +131,7 @@ class Variable: to_split: bool = False, delimiter: str | None = None, test_name: str | None = None, + optional: bool = False, ): self.name = name self.test_name = test_name @@ -140,6 +141,8 @@ class Variable: elif delimiter: raise ValueError(f"Variable {name} has a delimiter but split_string is set to False.") + self.optional = optional + def get_value(self): if hasattr(self, "default_value"): return self._format_value( @@ -147,10 +150,13 @@ class Variable: key=self.name, default_value=self.default_value, test_name=self.test_name, + optional=self.optional, ) ) - return self._format_value(fetch_variable(key=self.name, test_name=self.test_name)) + return self._format_value( + fetch_variable(key=self.name, test_name=self.test_name, optional=self.optional) + ) def set_default(self, default): # Since 'None' is a potentially valid "default" value, we are only creating this @@ -182,6 +188,7 @@ class SystemTestContextBuilder: variable_name: str, split_string: bool = False, delimiter: str | None = None, + optional: bool = False, **kwargs, ): """Register a variable to fetch from environment or cloud parameter store""" @@ -193,6 +200,7 @@ class SystemTestContextBuilder: to_split=split_string, delimiter=delimiter, test_name=self.test_name, + optional=optional, ) # default_value is accepted via kwargs so that it is completely optional and no @@ -220,7 +228,12 @@ class SystemTestContextBuilder: return variable_fetcher -def fetch_variable(key: str, default_value: str | None = None, test_name: str | None = None) -> str: +def fetch_variable( + key: str, + default_value: str | None = None, + test_name: str | None = None, + optional: bool = False, +) -> str | None: """ Given a Parameter name: first check for an existing Environment Variable, then check SSM for a value. If neither are available, fall back on the @@ -229,11 +242,13 @@ def fetch_variable(key: str, default_value: str | None = None, test_name: str | :param key: The name of the Parameter to fetch a value for. :param default_value: The default value to use if no value can be found. :param test_name: The system test name. + :param optional: Whether the variable is optional. If True, does not raise `ValueError` if variable + does not exist :return: The value of the parameter. """ value: str | None = os.getenv(key, _fetch_from_ssm(key, test_name)) or default_value - if not value: + if not optional and not value: raise ValueError(NO_VALUE_MSG.format(key=key)) return value @@ -249,7 +264,7 @@ def set_env_id() -> str: :return: A valid System Test Environment ID. """ - env_id: str = fetch_variable(ENV_ID_ENVIRON_KEY, DEFAULT_ENV_ID) + env_id: str = str(fetch_variable(ENV_ID_ENVIRON_KEY, DEFAULT_ENV_ID)) env_id = _validate_env_id(env_id) os.environ[ENV_ID_ENVIRON_KEY] = env_id