This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 99816d8984f feat: add idle/auto stop TTLs and master/worker instance 
flexibility … (#65653)
99816d8984f is described below

commit 99816d8984f2b0055b25b092469ab7402a4d539a
Author: sravani-bobbala <[email protected]>
AuthorDate: Sun May 31 03:35:03 2026 +0530

    feat: add idle/auto stop TTLs and master/worker instance flexibility … 
(#65653)
    
    * feat: add idle/auto stop TTLs and master/worker instance flexibility 
policies to Dataproc cluster configuration
    
    * fixing static test failures
    
    * fixing static test failures
    
    ---------
    
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 .../providers/google/cloud/operators/dataproc.py   |  43 +++++
 .../unit/google/cloud/operators/test_dataproc.py   | 210 +++++++++++++++++++++
 2 files changed, 253 insertions(+)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py 
b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
index 447f6a03348..e41e9b8550c 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
@@ -201,6 +201,13 @@ class ClusterGenerator:
     :param auto_delete_ttl: The life duration of cluster, the cluster will be
         auto-deleted at the end of this duration.
         A duration in seconds. (If auto_delete_time is set this parameter will 
be ignored)
+    :param idle_stop_ttl: The longest duration that cluster would keep alive 
while
+        staying idle. Passing this threshold will cause cluster to be 
auto-stopped.
+        A duration in seconds.
+    :param auto_stop_time:  The time when cluster will be auto-stopped.
+    :param auto_stop_ttl: The life duration of cluster, the cluster will be
+        auto-stopped at the end of this duration.
+        A duration in seconds. (If auto_stop_time is set this parameter will 
be ignored)
     :param customer_managed_key: The customer-managed key used for disk 
encryption
         
``projects/[PROJECT_STORING_KEYS]/locations/[LOCATION]/keyRings/[KEY_RING_NAME]/cryptoKeys/[KEY_NAME]``
 # noqa
     :param enable_component_gateway: Provides access to the web interfaces of 
default and selected optional
@@ -210,6 +217,8 @@ class ClusterGenerator:
         identify the driver group in future operations, such as resizing the 
node group.
     :param secondary_worker_instance_flexibility_policy: Instance flexibility 
Policy allowing a mixture of VM
         shapes and provisioning models.
+    :param master_instance_flexibility_policy: Instance flexibility Policy for 
master nodes.
+    :param worker_instance_flexibility_policy: Instance flexibility Policy for 
worker nodes.
     :param secondary_worker_accelerator_type: Type of the accelerator card 
(GPU) to attach to the secondary workers,
         see 
https://cloud.google.com/dataproc/docs/reference/rest/v1/InstanceGroupConfig#acceleratorconfig
     :param secondary_worker_accelerator_count: Number of accelerator cards 
(GPUs) to attach to the secondary workers
@@ -257,11 +266,16 @@ class ClusterGenerator:
         idle_delete_ttl: int | None = None,
         auto_delete_time: datetime | None = None,
         auto_delete_ttl: int | None = None,
+        idle_stop_ttl: int | None = None,
+        auto_stop_time: datetime | None = None,
+        auto_stop_ttl: int | None = None,
         customer_managed_key: str | None = None,
         enable_component_gateway: bool | None = False,
         driver_pool_size: int = 0,
         driver_pool_id: str | None = None,
         secondary_worker_instance_flexibility_policy: 
InstanceFlexibilityPolicy | None = None,
+        master_instance_flexibility_policy: InstanceFlexibilityPolicy | None = 
None,
+        worker_instance_flexibility_policy: InstanceFlexibilityPolicy | None = 
None,
         secondary_worker_accelerator_type: str | None = None,
         secondary_worker_accelerator_count: int | None = None,
         *,
@@ -307,12 +321,17 @@ class ClusterGenerator:
         self.idle_delete_ttl = idle_delete_ttl
         self.auto_delete_time = auto_delete_time
         self.auto_delete_ttl = auto_delete_ttl
+        self.idle_stop_ttl = idle_stop_ttl
+        self.auto_stop_time = auto_stop_time
+        self.auto_stop_ttl = auto_stop_ttl
         self.customer_managed_key = customer_managed_key
         self.enable_component_gateway = enable_component_gateway
         self.single_node = num_workers == 0
         self.driver_pool_size = driver_pool_size
         self.driver_pool_id = driver_pool_id
         self.secondary_worker_instance_flexibility_policy = 
secondary_worker_instance_flexibility_policy
+        self.master_instance_flexibility_policy = 
master_instance_flexibility_policy
+        self.worker_instance_flexibility_policy = 
worker_instance_flexibility_policy
         self.secondary_worker_accelerator_type = 
secondary_worker_accelerator_type
         self.secondary_worker_accelerator_count = 
secondary_worker_accelerator_count
         self.cluster_tier = cluster_tier
@@ -404,6 +423,17 @@ class ClusterGenerator:
         elif self.auto_delete_ttl:
             cluster_data[lifecycle_config]["auto_delete_ttl"] = {"seconds": 
int(self.auto_delete_ttl)}
 
+        if self.idle_stop_ttl:
+            cluster_data[lifecycle_config]["idle_stop_ttl"] = {"seconds": 
self.idle_stop_ttl}
+
+        if self.auto_stop_time:
+            utc_auto_stop_time = timezone.convert_to_utc(self.auto_stop_time)
+            cluster_data[lifecycle_config]["auto_stop_time"] = 
utc_auto_stop_time.strftime(
+                "%Y-%m-%dT%H:%M:%S.%fZ"
+            )
+        elif self.auto_stop_ttl:
+            cluster_data[lifecycle_config]["auto_stop_ttl"] = {"seconds": 
int(self.auto_stop_ttl)}
+
         return cluster_data
 
     def _build_driver_pool(self):
@@ -454,6 +484,19 @@ class ClusterGenerator:
             "autoscaling_config": {},
             "endpoint_config": {},
         }
+        if self.master_instance_flexibility_policy:
+            cluster_data["master_config"]["instance_flexibility_policy"] = {
+                "instance_selection_list": [
+                    vars(s) for s in 
self.master_instance_flexibility_policy.instance_selection_list
+                ]
+            }
+
+        if self.worker_instance_flexibility_policy:
+            cluster_data["worker_config"]["instance_flexibility_policy"] = {
+                "instance_selection_list": [
+                    vars(s) for s in 
self.worker_instance_flexibility_policy.instance_selection_list
+                ]
+            }
 
         if self.min_num_workers:
             cluster_data["worker_config"]["min_num_instances"] = 
self.min_num_workers
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py 
b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
index ecaeae1ca3b..7dbe93033fc 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
@@ -325,6 +325,120 @@ CONFIG_WITH_GPU_ACCELERATOR = {
     "endpoint_config": {},
 }
 
+CONFIG_WITH_MASTER_AND_WORKER_FLEX_MIG = {
+    "gce_cluster_config": {
+        "zone_uri": 
"https://www.googleapis.com/compute/v1/projects/project_id/zones/zone";,
+        "metadata": {"metadata": "data"},
+        "network_uri": "network_uri",
+        "subnetwork_uri": "subnetwork_uri",
+        "internal_ip_only": True,
+        "tags": ["tags"],
+        "service_account": "service_account",
+        "service_account_scopes": ["service_account_scopes"],
+    },
+    "master_config": {
+        "num_instances": 2,
+        "machine_type_uri": 
"projects/project_id/zones/zone/machineTypes/master_machine_type",
+        "disk_config": {"boot_disk_type": "master_disk_type", 
"boot_disk_size_gb": 128},
+        "image_uri": "https://www.googleapis.com/compute/beta/projects/";
+        "custom_image_project_id/global/images/custom_image",
+        "instance_flexibility_policy": {
+            "instance_selection_list": [
+                {
+                    "machine_types": [
+                        "projects/project_id/zones/zone/machineTypes/machine1",
+                        "projects/project_id/zones/zone/machineTypes/machine2",
+                    ],
+                    "rank": 0,
+                },
+                {"machine_types": 
["projects/project_id/zones/zone/machineTypes/machine3"], "rank": 1},
+            ],
+        },
+    },
+    "worker_config": {
+        "num_instances": 3,
+        "machine_type_uri": 
"projects/project_id/zones/zone/machineTypes/worker_machine_type",
+        "disk_config": {"boot_disk_type": "worker_disk_type", 
"boot_disk_size_gb": 256},
+        "image_uri": "https://www.googleapis.com/compute/beta/projects/";
+        "custom_image_project_id/global/images/custom_image",
+        "instance_flexibility_policy": {
+            "instance_selection_list": [
+                {
+                    "machine_types": [
+                        "projects/project_id/zones/zone/machineTypes/machine1",
+                        "projects/project_id/zones/zone/machineTypes/machine2",
+                    ],
+                    "rank": 0,
+                },
+                {"machine_types": 
["projects/project_id/zones/zone/machineTypes/machine3"], "rank": 1},
+            ],
+        },
+    },
+    "secondary_worker_config": {
+        "num_instances": 4,
+        "machine_type_uri": 
"projects/project_id/zones/zone/machineTypes/worker_machine_type",
+        "disk_config": {"boot_disk_type": "worker_disk_type", 
"boot_disk_size_gb": 256},
+        "is_preemptible": True,
+        "preemptibility": "SPOT",
+    },
+    "software_config": {"properties": {"properties": "data"}, 
"optional_components": ["optional_components"]},
+    "lifecycle_config": {},
+    "encryption_config": {"gce_pd_kms_key_name": "customer_managed_key"},
+    "autoscaling_config": {"policy_uri": "autoscaling_policy"},
+    "config_bucket": "storage_bucket",
+    "initialization_actions": [
+        {"executable_file": "init_actions_uris", "execution_timeout": 
{"seconds": 600}}
+    ],
+    "endpoint_config": {},
+}
+
+
+CONFIG_WITH_STOP_TTL = {
+    "gce_cluster_config": {
+        "zone_uri": 
"https://www.googleapis.com/compute/v1/projects/project_id/zones/zone";,
+        "metadata": {"metadata": "data"},
+        "network_uri": "network_uri",
+        "subnetwork_uri": "subnetwork_uri",
+        "internal_ip_only": True,
+        "tags": ["tags"],
+        "service_account": "service_account",
+        "service_account_scopes": ["service_account_scopes"],
+    },
+    "master_config": {
+        "num_instances": 2,
+        "machine_type_uri": 
"projects/project_id/zones/zone/machineTypes/master_machine_type",
+        "disk_config": {"boot_disk_type": "master_disk_type", 
"boot_disk_size_gb": 128},
+        "image_uri": "https://www.googleapis.com/compute/beta/projects/";
+        "custom_image_project_id/global/images/custom_image",
+    },
+    "worker_config": {
+        "num_instances": 2,
+        "machine_type_uri": 
"projects/project_id/zones/zone/machineTypes/worker_machine_type",
+        "disk_config": {"boot_disk_type": "worker_disk_type", 
"boot_disk_size_gb": 256},
+        "image_uri": "https://www.googleapis.com/compute/beta/projects/";
+        "custom_image_project_id/global/images/custom_image",
+    },
+    "secondary_worker_config": {
+        "num_instances": 4,
+        "machine_type_uri": 
"projects/project_id/zones/zone/machineTypes/worker_machine_type",
+        "disk_config": {"boot_disk_type": "worker_disk_type", 
"boot_disk_size_gb": 256},
+        "is_preemptible": True,
+        "preemptibility": "SPOT",
+    },
+    "software_config": {"properties": {"properties": "data"}, 
"optional_components": ["optional_components"]},
+    "lifecycle_config": {
+        "idle_stop_ttl": {"seconds": 300},
+        "auto_stop_time": "2019-09-12T00:00:00.000000Z",
+    },
+    "encryption_config": {"gce_pd_kms_key_name": "customer_managed_key"},
+    "autoscaling_config": {"policy_uri": "autoscaling_policy"},
+    "config_bucket": "storage_bucket",
+    "initialization_actions": [
+        {"executable_file": "init_actions_uris", "execution_timeout": 
{"seconds": 600}}
+    ],
+    "endpoint_config": {},
+}
+
 LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION_LABEL}
 
 LABELS.update({"airflow-version": AIRFLOW_VERSION_LABEL})
@@ -699,6 +813,102 @@ class TestsClusterGenerator:
         cluster = generator.make()
         assert cluster == CONFIG_WITH_FLEX_MIG
 
+    def test_build_with_master_and_worker_flex_migs(self):
+        generator = ClusterGenerator(
+            project_id="project_id",
+            num_workers=3,
+            zone="zone",
+            network_uri="network_uri",
+            subnetwork_uri="subnetwork_uri",
+            internal_ip_only=True,
+            tags=["tags"],
+            storage_bucket="storage_bucket",
+            init_actions_uris=["init_actions_uris"],
+            init_action_timeout="10m",
+            metadata={"metadata": "data"},
+            custom_image="custom_image",
+            custom_image_project_id="custom_image_project_id",
+            autoscaling_policy="autoscaling_policy",
+            properties={"properties": "data"},
+            optional_components=["optional_components"],
+            num_masters=2,
+            master_machine_type="master_machine_type",
+            master_disk_type="master_disk_type",
+            master_disk_size=128,
+            worker_machine_type="worker_machine_type",
+            worker_disk_type="worker_disk_type",
+            worker_disk_size=256,
+            num_preemptible_workers=4,
+            preemptibility="Spot",
+            region="region",
+            service_account="service_account",
+            service_account_scopes=["service_account_scopes"],
+            customer_managed_key="customer_managed_key",
+            master_instance_flexibility_policy=InstanceFlexibilityPolicy(
+                [
+                    InstanceSelection(
+                        [
+                            
"projects/project_id/zones/zone/machineTypes/machine1",
+                            
"projects/project_id/zones/zone/machineTypes/machine2",
+                        ],
+                        0,
+                    ),
+                    
InstanceSelection(["projects/project_id/zones/zone/machineTypes/machine3"], 1),
+                ]
+            ),
+            worker_instance_flexibility_policy=InstanceFlexibilityPolicy(
+                [
+                    InstanceSelection(
+                        [
+                            
"projects/project_id/zones/zone/machineTypes/machine1",
+                            
"projects/project_id/zones/zone/machineTypes/machine2",
+                        ],
+                        0,
+                    ),
+                    
InstanceSelection(["projects/project_id/zones/zone/machineTypes/machine3"], 1),
+                ]
+            ),
+        )
+        cluster = generator.make()
+        assert cluster == CONFIG_WITH_MASTER_AND_WORKER_FLEX_MIG
+
+    def test_build_with_stop_ttl(self):
+        generator = ClusterGenerator(
+            project_id="project_id",
+            num_workers=2,
+            zone="zone",
+            network_uri="network_uri",
+            subnetwork_uri="subnetwork_uri",
+            internal_ip_only=True,
+            tags=["tags"],
+            storage_bucket="storage_bucket",
+            init_actions_uris=["init_actions_uris"],
+            init_action_timeout="10m",
+            metadata={"metadata": "data"},
+            custom_image="custom_image",
+            custom_image_project_id="custom_image_project_id",
+            autoscaling_policy="autoscaling_policy",
+            properties={"properties": "data"},
+            optional_components=["optional_components"],
+            num_masters=2,
+            master_machine_type="master_machine_type",
+            master_disk_type="master_disk_type",
+            master_disk_size=128,
+            worker_machine_type="worker_machine_type",
+            worker_disk_type="worker_disk_type",
+            worker_disk_size=256,
+            num_preemptible_workers=4,
+            preemptibility="Spot",
+            region="region",
+            service_account="service_account",
+            service_account_scopes=["service_account_scopes"],
+            idle_stop_ttl=300,
+            auto_stop_time=timezone.datetime(2019, 9, 12),
+            customer_managed_key="customer_managed_key",
+        )
+        cluster = generator.make()
+        assert cluster == CONFIG_WITH_STOP_TTL
+
     def test_build_with_gpu_accelerator(self):
         generator = ClusterGenerator(
             project_id="project_id",

Reply via email to