This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 99816d8984f feat: add idle/auto stop TTLs and master/worker instance
flexibility … (#65653)
99816d8984f is described below
commit 99816d8984f2b0055b25b092469ab7402a4d539a
Author: sravani-bobbala <[email protected]>
AuthorDate: Sun May 31 03:35:03 2026 +0530
feat: add idle/auto stop TTLs and master/worker instance flexibility …
(#65653)
* feat: add idle/auto stop TTLs and master/worker instance flexibility
policies to Dataproc cluster configuration
* fixing static test failures
* fixing static test failures
---------
Co-authored-by: Jarek Potiuk <[email protected]>
---
.../providers/google/cloud/operators/dataproc.py | 43 +++++
.../unit/google/cloud/operators/test_dataproc.py | 210 +++++++++++++++++++++
2 files changed, 253 insertions(+)
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
index 447f6a03348..e41e9b8550c 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
@@ -201,6 +201,13 @@ class ClusterGenerator:
:param auto_delete_ttl: The life duration of cluster, the cluster will be
auto-deleted at the end of this duration.
A duration in seconds. (If auto_delete_time is set this parameter will
be ignored)
+ :param idle_stop_ttl: The longest duration that cluster would keep alive
while
+ staying idle. Passing this threshold will cause cluster to be
auto-stopped.
+ A duration in seconds.
+ :param auto_stop_time: The time when cluster will be auto-stopped.
+ :param auto_stop_ttl: The life duration of cluster, the cluster will be
+ auto-stopped at the end of this duration.
+ A duration in seconds. (If auto_stop_time is set this parameter will
be ignored)
:param customer_managed_key: The customer-managed key used for disk
encryption
``projects/[PROJECT_STORING_KEYS]/locations/[LOCATION]/keyRings/[KEY_RING_NAME]/cryptoKeys/[KEY_NAME]``
# noqa
:param enable_component_gateway: Provides access to the web interfaces of
default and selected optional
@@ -210,6 +217,8 @@ class ClusterGenerator:
identify the driver group in future operations, such as resizing the
node group.
:param secondary_worker_instance_flexibility_policy: Instance flexibility
Policy allowing a mixture of VM
shapes and provisioning models.
+ :param master_instance_flexibility_policy: Instance flexibility Policy for
master nodes.
+ :param worker_instance_flexibility_policy: Instance flexibility Policy for
worker nodes.
:param secondary_worker_accelerator_type: Type of the accelerator card
(GPU) to attach to the secondary workers,
see
https://cloud.google.com/dataproc/docs/reference/rest/v1/InstanceGroupConfig#acceleratorconfig
:param secondary_worker_accelerator_count: Number of accelerator cards
(GPUs) to attach to the secondary workers
@@ -257,11 +266,16 @@ class ClusterGenerator:
idle_delete_ttl: int | None = None,
auto_delete_time: datetime | None = None,
auto_delete_ttl: int | None = None,
+ idle_stop_ttl: int | None = None,
+ auto_stop_time: datetime | None = None,
+ auto_stop_ttl: int | None = None,
customer_managed_key: str | None = None,
enable_component_gateway: bool | None = False,
driver_pool_size: int = 0,
driver_pool_id: str | None = None,
secondary_worker_instance_flexibility_policy:
InstanceFlexibilityPolicy | None = None,
+ master_instance_flexibility_policy: InstanceFlexibilityPolicy | None =
None,
+ worker_instance_flexibility_policy: InstanceFlexibilityPolicy | None =
None,
secondary_worker_accelerator_type: str | None = None,
secondary_worker_accelerator_count: int | None = None,
*,
@@ -307,12 +321,17 @@ class ClusterGenerator:
self.idle_delete_ttl = idle_delete_ttl
self.auto_delete_time = auto_delete_time
self.auto_delete_ttl = auto_delete_ttl
+ self.idle_stop_ttl = idle_stop_ttl
+ self.auto_stop_time = auto_stop_time
+ self.auto_stop_ttl = auto_stop_ttl
self.customer_managed_key = customer_managed_key
self.enable_component_gateway = enable_component_gateway
self.single_node = num_workers == 0
self.driver_pool_size = driver_pool_size
self.driver_pool_id = driver_pool_id
self.secondary_worker_instance_flexibility_policy =
secondary_worker_instance_flexibility_policy
+ self.master_instance_flexibility_policy =
master_instance_flexibility_policy
+ self.worker_instance_flexibility_policy =
worker_instance_flexibility_policy
self.secondary_worker_accelerator_type =
secondary_worker_accelerator_type
self.secondary_worker_accelerator_count =
secondary_worker_accelerator_count
self.cluster_tier = cluster_tier
@@ -404,6 +423,17 @@ class ClusterGenerator:
elif self.auto_delete_ttl:
cluster_data[lifecycle_config]["auto_delete_ttl"] = {"seconds":
int(self.auto_delete_ttl)}
+ if self.idle_stop_ttl:
+ cluster_data[lifecycle_config]["idle_stop_ttl"] = {"seconds":
self.idle_stop_ttl}
+
+ if self.auto_stop_time:
+ utc_auto_stop_time = timezone.convert_to_utc(self.auto_stop_time)
+ cluster_data[lifecycle_config]["auto_stop_time"] =
utc_auto_stop_time.strftime(
+ "%Y-%m-%dT%H:%M:%S.%fZ"
+ )
+ elif self.auto_stop_ttl:
+ cluster_data[lifecycle_config]["auto_stop_ttl"] = {"seconds":
int(self.auto_stop_ttl)}
+
return cluster_data
def _build_driver_pool(self):
@@ -454,6 +484,19 @@ class ClusterGenerator:
"autoscaling_config": {},
"endpoint_config": {},
}
+ if self.master_instance_flexibility_policy:
+ cluster_data["master_config"]["instance_flexibility_policy"] = {
+ "instance_selection_list": [
+ vars(s) for s in
self.master_instance_flexibility_policy.instance_selection_list
+ ]
+ }
+
+ if self.worker_instance_flexibility_policy:
+ cluster_data["worker_config"]["instance_flexibility_policy"] = {
+ "instance_selection_list": [
+ vars(s) for s in
self.worker_instance_flexibility_policy.instance_selection_list
+ ]
+ }
if self.min_num_workers:
cluster_data["worker_config"]["min_num_instances"] =
self.min_num_workers
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
index ecaeae1ca3b..7dbe93033fc 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
@@ -325,6 +325,120 @@ CONFIG_WITH_GPU_ACCELERATOR = {
"endpoint_config": {},
}
+CONFIG_WITH_MASTER_AND_WORKER_FLEX_MIG = {
+ "gce_cluster_config": {
+ "zone_uri":
"https://www.googleapis.com/compute/v1/projects/project_id/zones/zone",
+ "metadata": {"metadata": "data"},
+ "network_uri": "network_uri",
+ "subnetwork_uri": "subnetwork_uri",
+ "internal_ip_only": True,
+ "tags": ["tags"],
+ "service_account": "service_account",
+ "service_account_scopes": ["service_account_scopes"],
+ },
+ "master_config": {
+ "num_instances": 2,
+ "machine_type_uri":
"projects/project_id/zones/zone/machineTypes/master_machine_type",
+ "disk_config": {"boot_disk_type": "master_disk_type",
"boot_disk_size_gb": 128},
+ "image_uri": "https://www.googleapis.com/compute/beta/projects/"
+ "custom_image_project_id/global/images/custom_image",
+ "instance_flexibility_policy": {
+ "instance_selection_list": [
+ {
+ "machine_types": [
+ "projects/project_id/zones/zone/machineTypes/machine1",
+ "projects/project_id/zones/zone/machineTypes/machine2",
+ ],
+ "rank": 0,
+ },
+ {"machine_types":
["projects/project_id/zones/zone/machineTypes/machine3"], "rank": 1},
+ ],
+ },
+ },
+ "worker_config": {
+ "num_instances": 3,
+ "machine_type_uri":
"projects/project_id/zones/zone/machineTypes/worker_machine_type",
+ "disk_config": {"boot_disk_type": "worker_disk_type",
"boot_disk_size_gb": 256},
+ "image_uri": "https://www.googleapis.com/compute/beta/projects/"
+ "custom_image_project_id/global/images/custom_image",
+ "instance_flexibility_policy": {
+ "instance_selection_list": [
+ {
+ "machine_types": [
+ "projects/project_id/zones/zone/machineTypes/machine1",
+ "projects/project_id/zones/zone/machineTypes/machine2",
+ ],
+ "rank": 0,
+ },
+ {"machine_types":
["projects/project_id/zones/zone/machineTypes/machine3"], "rank": 1},
+ ],
+ },
+ },
+ "secondary_worker_config": {
+ "num_instances": 4,
+ "machine_type_uri":
"projects/project_id/zones/zone/machineTypes/worker_machine_type",
+ "disk_config": {"boot_disk_type": "worker_disk_type",
"boot_disk_size_gb": 256},
+ "is_preemptible": True,
+ "preemptibility": "SPOT",
+ },
+ "software_config": {"properties": {"properties": "data"},
"optional_components": ["optional_components"]},
+ "lifecycle_config": {},
+ "encryption_config": {"gce_pd_kms_key_name": "customer_managed_key"},
+ "autoscaling_config": {"policy_uri": "autoscaling_policy"},
+ "config_bucket": "storage_bucket",
+ "initialization_actions": [
+ {"executable_file": "init_actions_uris", "execution_timeout":
{"seconds": 600}}
+ ],
+ "endpoint_config": {},
+}
+
+
+CONFIG_WITH_STOP_TTL = {
+ "gce_cluster_config": {
+ "zone_uri":
"https://www.googleapis.com/compute/v1/projects/project_id/zones/zone",
+ "metadata": {"metadata": "data"},
+ "network_uri": "network_uri",
+ "subnetwork_uri": "subnetwork_uri",
+ "internal_ip_only": True,
+ "tags": ["tags"],
+ "service_account": "service_account",
+ "service_account_scopes": ["service_account_scopes"],
+ },
+ "master_config": {
+ "num_instances": 2,
+ "machine_type_uri":
"projects/project_id/zones/zone/machineTypes/master_machine_type",
+ "disk_config": {"boot_disk_type": "master_disk_type",
"boot_disk_size_gb": 128},
+ "image_uri": "https://www.googleapis.com/compute/beta/projects/"
+ "custom_image_project_id/global/images/custom_image",
+ },
+ "worker_config": {
+ "num_instances": 2,
+ "machine_type_uri":
"projects/project_id/zones/zone/machineTypes/worker_machine_type",
+ "disk_config": {"boot_disk_type": "worker_disk_type",
"boot_disk_size_gb": 256},
+ "image_uri": "https://www.googleapis.com/compute/beta/projects/"
+ "custom_image_project_id/global/images/custom_image",
+ },
+ "secondary_worker_config": {
+ "num_instances": 4,
+ "machine_type_uri":
"projects/project_id/zones/zone/machineTypes/worker_machine_type",
+ "disk_config": {"boot_disk_type": "worker_disk_type",
"boot_disk_size_gb": 256},
+ "is_preemptible": True,
+ "preemptibility": "SPOT",
+ },
+ "software_config": {"properties": {"properties": "data"},
"optional_components": ["optional_components"]},
+ "lifecycle_config": {
+ "idle_stop_ttl": {"seconds": 300},
+ "auto_stop_time": "2019-09-12T00:00:00.000000Z",
+ },
+ "encryption_config": {"gce_pd_kms_key_name": "customer_managed_key"},
+ "autoscaling_config": {"policy_uri": "autoscaling_policy"},
+ "config_bucket": "storage_bucket",
+ "initialization_actions": [
+ {"executable_file": "init_actions_uris", "execution_timeout":
{"seconds": 600}}
+ ],
+ "endpoint_config": {},
+}
+
LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION_LABEL}
LABELS.update({"airflow-version": AIRFLOW_VERSION_LABEL})
@@ -699,6 +813,102 @@ class TestsClusterGenerator:
cluster = generator.make()
assert cluster == CONFIG_WITH_FLEX_MIG
+ def test_build_with_master_and_worker_flex_migs(self):
+ generator = ClusterGenerator(
+ project_id="project_id",
+ num_workers=3,
+ zone="zone",
+ network_uri="network_uri",
+ subnetwork_uri="subnetwork_uri",
+ internal_ip_only=True,
+ tags=["tags"],
+ storage_bucket="storage_bucket",
+ init_actions_uris=["init_actions_uris"],
+ init_action_timeout="10m",
+ metadata={"metadata": "data"},
+ custom_image="custom_image",
+ custom_image_project_id="custom_image_project_id",
+ autoscaling_policy="autoscaling_policy",
+ properties={"properties": "data"},
+ optional_components=["optional_components"],
+ num_masters=2,
+ master_machine_type="master_machine_type",
+ master_disk_type="master_disk_type",
+ master_disk_size=128,
+ worker_machine_type="worker_machine_type",
+ worker_disk_type="worker_disk_type",
+ worker_disk_size=256,
+ num_preemptible_workers=4,
+ preemptibility="Spot",
+ region="region",
+ service_account="service_account",
+ service_account_scopes=["service_account_scopes"],
+ customer_managed_key="customer_managed_key",
+ master_instance_flexibility_policy=InstanceFlexibilityPolicy(
+ [
+ InstanceSelection(
+ [
+
"projects/project_id/zones/zone/machineTypes/machine1",
+
"projects/project_id/zones/zone/machineTypes/machine2",
+ ],
+ 0,
+ ),
+
InstanceSelection(["projects/project_id/zones/zone/machineTypes/machine3"], 1),
+ ]
+ ),
+ worker_instance_flexibility_policy=InstanceFlexibilityPolicy(
+ [
+ InstanceSelection(
+ [
+
"projects/project_id/zones/zone/machineTypes/machine1",
+
"projects/project_id/zones/zone/machineTypes/machine2",
+ ],
+ 0,
+ ),
+
InstanceSelection(["projects/project_id/zones/zone/machineTypes/machine3"], 1),
+ ]
+ ),
+ )
+ cluster = generator.make()
+ assert cluster == CONFIG_WITH_MASTER_AND_WORKER_FLEX_MIG
+
+ def test_build_with_stop_ttl(self):
+ generator = ClusterGenerator(
+ project_id="project_id",
+ num_workers=2,
+ zone="zone",
+ network_uri="network_uri",
+ subnetwork_uri="subnetwork_uri",
+ internal_ip_only=True,
+ tags=["tags"],
+ storage_bucket="storage_bucket",
+ init_actions_uris=["init_actions_uris"],
+ init_action_timeout="10m",
+ metadata={"metadata": "data"},
+ custom_image="custom_image",
+ custom_image_project_id="custom_image_project_id",
+ autoscaling_policy="autoscaling_policy",
+ properties={"properties": "data"},
+ optional_components=["optional_components"],
+ num_masters=2,
+ master_machine_type="master_machine_type",
+ master_disk_type="master_disk_type",
+ master_disk_size=128,
+ worker_machine_type="worker_machine_type",
+ worker_disk_type="worker_disk_type",
+ worker_disk_size=256,
+ num_preemptible_workers=4,
+ preemptibility="Spot",
+ region="region",
+ service_account="service_account",
+ service_account_scopes=["service_account_scopes"],
+ idle_stop_ttl=300,
+ auto_stop_time=timezone.datetime(2019, 9, 12),
+ customer_managed_key="customer_managed_key",
+ )
+ cluster = generator.make()
+ assert cluster == CONFIG_WITH_STOP_TTL
+
def test_build_with_gpu_accelerator(self):
generator = ClusterGenerator(
project_id="project_id",