This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f57ee1256 Add capability of passing a bucket to `example_emr` system 
test (#36192)
6f57ee1256 is described below

commit 6f57ee12564759892af1152606af5373d2b709c0
Author: Vincent <97131062+vincb...@users.noreply.github.com>
AuthorDate: Tue Dec 12 16:11:36 2023 -0500

    Add capability of passing a bucket to `example_emr` system test (#36192)
---
 tests/system/providers/amazon/aws/example_emr.py   | 66 ++++++++++++++--------
 .../system/providers/amazon/aws/utils/__init__.py  | 23 ++++++--
 2 files changed, 61 insertions(+), 28 deletions(-)

diff --git a/tests/system/providers/amazon/aws/example_emr.py 
b/tests/system/providers/amazon/aws/example_emr.py
index 32f10f6f1b..e131b18f3c 100644
--- a/tests/system/providers/amazon/aws/example_emr.py
+++ b/tests/system/providers/amazon/aws/example_emr.py
@@ -42,6 +42,7 @@ from tests.system.providers.amazon.aws.utils import 
ENV_ID_KEY, SystemTestContex
 DAG_ID = "example_emr"
 CONFIG_NAME = "EMR Runtime Role Security Configuration"
 EXECUTION_ROLE_ARN_KEY = "EXECUTION_ROLE_ARN"
+BUCKET_NAME_KEY = "BUCKET_NAME"
 
 SECURITY_CONFIGURATION = {
     "AuthorizationConfiguration": {
@@ -123,7 +124,12 @@ def get_step_id(step_ids: list):
     return step_ids[0]
 
 
-sys_test_context_task = 
SystemTestContextBuilder().add_variable(EXECUTION_ROLE_ARN_KEY).build()
+sys_test_context_task = (
+    SystemTestContextBuilder()
+    .add_variable(EXECUTION_ROLE_ARN_KEY)
+    .add_variable(BUCKET_NAME_KEY, optional=True)
+    .build()
+)
 
 with DAG(
     dag_id=DAG_ID,
@@ -137,14 +143,12 @@ with DAG(
     env_id = test_context[ENV_ID_KEY]
     config_name = f"{CONFIG_NAME}-{env_id}"
     execution_role_arn = test_context[EXECUTION_ROLE_ARN_KEY]
-    s3_bucket = f"{env_id}-emr-bucket"
+    s3_bucket = test_context[BUCKET_NAME_KEY] or f"{env_id}-emr-bucket"
 
     JOB_FLOW_OVERRIDES["LogUri"] = f"s3://{s3_bucket}/"
     JOB_FLOW_OVERRIDES["SecurityConfiguration"] = config_name
     JOB_FLOW_OVERRIDES["Instances"]["InstanceGroups"][0]["CustomAmiId"] = 
get_ami_id()
 
-    create_s3_bucket = S3CreateBucketOperator(task_id="create_s3_bucket", 
bucket_name=s3_bucket)
-
     create_security_configuration = configure_security_config(config_name)
 
     # [START howto_operator_emr_create_job_flow]
@@ -196,28 +200,42 @@ with DAG(
 
     delete_security_configuration = delete_security_config(config_name)
 
-    delete_s3_bucket = S3DeleteBucketOperator(
-        task_id="delete_s3_bucket",
-        bucket_name=s3_bucket,
-        force_delete=True,
-        trigger_rule=TriggerRule.ALL_DONE,
-    )
+    # There are two options:
+    # - Pass the bucket name as an argument to the system test. This bucket 
will then be used to store
+    #   the EMR-related logs.
+    # - The test itself creates and delete the S3 bucket needed for this test.
+    create_s3_bucket: S3CreateBucketOperator | None = None
+    delete_s3_bucket: S3DeleteBucketOperator | None = None
+    if not test_context[BUCKET_NAME_KEY]:
+        create_s3_bucket = S3CreateBucketOperator(task_id="create_s3_bucket", 
bucket_name=s3_bucket)
+        delete_s3_bucket = S3DeleteBucketOperator(
+            task_id="delete_s3_bucket",
+            bucket_name=s3_bucket,
+            force_delete=True,
+            trigger_rule=TriggerRule.ALL_DONE,
+        )
 
     chain(
-        # TEST SETUP
-        test_context,
-        create_s3_bucket,
-        create_security_configuration,
-        # TEST BODY
-        create_job_flow,
-        modify_cluster,
-        add_steps,
-        wait_for_step,
-        # TEST TEARDOWN
-        remove_cluster,
-        check_job_flow,
-        delete_security_configuration,
-        delete_s3_bucket,
+        *[
+            task
+            for task in [
+                # TEST SETUP
+                test_context,
+                create_s3_bucket,
+                create_security_configuration,
+                # TEST BODY
+                create_job_flow,
+                modify_cluster,
+                add_steps,
+                wait_for_step,
+                # TEST TEARDOWN
+                remove_cluster,
+                check_job_flow,
+                delete_security_configuration,
+                delete_s3_bucket,
+            ]
+            if task is not None
+        ]
     )
 
     from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/amazon/aws/utils/__init__.py 
b/tests/system/providers/amazon/aws/utils/__init__.py
index d86abd0486..1bdcbf656f 100644
--- a/tests/system/providers/amazon/aws/utils/__init__.py
+++ b/tests/system/providers/amazon/aws/utils/__init__.py
@@ -131,6 +131,7 @@ class Variable:
         to_split: bool = False,
         delimiter: str | None = None,
         test_name: str | None = None,
+        optional: bool = False,
     ):
         self.name = name
         self.test_name = test_name
@@ -140,6 +141,8 @@ class Variable:
         elif delimiter:
             raise ValueError(f"Variable {name} has a delimiter but 
split_string is set to False.")
 
+        self.optional = optional
+
     def get_value(self):
         if hasattr(self, "default_value"):
             return self._format_value(
@@ -147,10 +150,13 @@ class Variable:
                     key=self.name,
                     default_value=self.default_value,
                     test_name=self.test_name,
+                    optional=self.optional,
                 )
             )
 
-        return self._format_value(fetch_variable(key=self.name, 
test_name=self.test_name))
+        return self._format_value(
+            fetch_variable(key=self.name, test_name=self.test_name, 
optional=self.optional)
+        )
 
     def set_default(self, default):
         # Since 'None' is a potentially valid "default" value, we are only 
creating this
@@ -182,6 +188,7 @@ class SystemTestContextBuilder:
         variable_name: str,
         split_string: bool = False,
         delimiter: str | None = None,
+        optional: bool = False,
         **kwargs,
     ):
         """Register a variable to fetch from environment or cloud parameter 
store"""
@@ -193,6 +200,7 @@ class SystemTestContextBuilder:
             to_split=split_string,
             delimiter=delimiter,
             test_name=self.test_name,
+            optional=optional,
         )
 
         # default_value is accepted via kwargs so that it is completely 
optional and no
@@ -220,7 +228,12 @@ class SystemTestContextBuilder:
         return variable_fetcher
 
 
-def fetch_variable(key: str, default_value: str | None = None, test_name: str 
| None = None) -> str:
+def fetch_variable(
+    key: str,
+    default_value: str | None = None,
+    test_name: str | None = None,
+    optional: bool = False,
+) -> str | None:
     """
     Given a Parameter name: first check for an existing Environment Variable,
     then check SSM for a value. If neither are available, fall back on the
@@ -229,11 +242,13 @@ def fetch_variable(key: str, default_value: str | None = 
None, test_name: str |
     :param key: The name of the Parameter to fetch a value for.
     :param default_value: The default value to use if no value can be found.
     :param test_name: The system test name.
+    :param optional: Whether the variable is optional. If True, does not raise 
`ValueError` if variable
+        does not exist
     :return: The value of the parameter.
     """
 
     value: str | None = os.getenv(key, _fetch_from_ssm(key, test_name)) or 
default_value
-    if not value:
+    if not optional and not value:
         raise ValueError(NO_VALUE_MSG.format(key=key))
     return value
 
@@ -249,7 +264,7 @@ def set_env_id() -> str:
 
     :return: A valid System Test Environment ID.
     """
-    env_id: str = fetch_variable(ENV_ID_ENVIRON_KEY, DEFAULT_ENV_ID)
+    env_id: str = str(fetch_variable(ENV_ID_ENVIRON_KEY, DEFAULT_ENV_ID))
     env_id = _validate_env_id(env_id)
 
     os.environ[ENV_ID_ENVIRON_KEY] = env_id

Reply via email to