This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 84aa6539e6b Add DNS endpoint support for GKE Hooks and Operators
(#48075)
84aa6539e6b is described below
commit 84aa6539e6bf7e315597148b1e88f8fdecac2a4b
Author: Maksim <[email protected]>
AuthorDate: Sat Mar 22 11:42:18 2025 -0700
Add DNS endpoint support for GKE Hooks and Operators (#48075)
---
generated/provider_dependencies.json | 2 +-
providers/google/README.rst | 2 +-
.../docs/operators/cloud/kubernetes_engine.rst | 5 ++
providers/google/pyproject.toml | 2 +-
.../google/cloud/hooks/kubernetes_engine.py | 18 ++++---
.../google/cloud/operators/kubernetes_engine.py | 55 +++++++++++++++++++--
.../airflow/providers/google/get_provider_info.py | 2 +-
.../cloud/operators/test_kubernetes_engine.py | 56 ++++++++++++++++++++--
8 files changed, 125 insertions(+), 17 deletions(-)
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index 33beef3a1c2..95e149de682 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -672,7 +672,7 @@
"google-cloud-bigtable>=2.17.0",
"google-cloud-build>=3.31.0",
"google-cloud-compute>=1.10.0",
- "google-cloud-container>=2.17.4",
+ "google-cloud-container>=2.52.0",
"google-cloud-datacatalog>=3.23.0",
"google-cloud-dataflow-client>=0.8.6",
"google-cloud-dataform>=0.5.0",
diff --git a/providers/google/README.rst b/providers/google/README.rst
index 660bca2f438..e485ab507cb 100644
--- a/providers/google/README.rst
+++ b/providers/google/README.rst
@@ -83,7 +83,7 @@ PIP package Version required
``google-cloud-bigtable`` ``>=2.17.0``
``google-cloud-build`` ``>=3.31.0``
``google-cloud-compute`` ``>=1.10.0``
-``google-cloud-container`` ``>=2.17.4``
+``google-cloud-container`` ``>=2.52.0``
``google-cloud-datacatalog`` ``>=3.23.0``
``google-cloud-dataflow-client`` ``>=0.8.6``
``google-cloud-dataform`` ``>=0.5.0``
diff --git a/providers/google/docs/operators/cloud/kubernetes_engine.rst
b/providers/google/docs/operators/cloud/kubernetes_engine.rst
index d8e549fc986..87904b9eaae 100644
--- a/providers/google/docs/operators/cloud/kubernetes_engine.rst
+++ b/providers/google/docs/operators/cloud/kubernetes_engine.rst
@@ -150,6 +150,11 @@ Private clusters have two unique endpoint values:
``privateEndpoint``, which is
sets the external IP address as the endpoint by default. If you prefer to use
the internal IP as the
endpoint, you need to set ``use_internal_ip`` parameter to ``True``.
+Using with DNS endpoint cluster
+'''''''''''''''''''''''''''''''
+
+For running ``GKEStartPodOperator`` using DNS endpoint you need to set
``use_dns_endpoint`` parameter to ``True``.
+
Using with Autopilot (serverless) cluster
'''''''''''''''''''''''''''''''''''''''''
diff --git a/providers/google/pyproject.toml b/providers/google/pyproject.toml
index 6353dab6ef0..597e585a7db 100644
--- a/providers/google/pyproject.toml
+++ b/providers/google/pyproject.toml
@@ -84,7 +84,7 @@ dependencies = [
"google-cloud-bigtable>=2.17.0",
"google-cloud-build>=3.31.0",
"google-cloud-compute>=1.10.0",
- "google-cloud-container>=2.17.4",
+ "google-cloud-container>=2.52.0",
"google-cloud-datacatalog>=3.23.0",
"google-cloud-dataflow-client>=0.8.6",
"google-cloud-dataform>=0.5.0",
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py
b/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py
index 3b791c5359d..309eb45d2e6 100644
---
a/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py
+++
b/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py
@@ -64,11 +64,13 @@ class GKEClusterConnection:
ssl_ca_cert: str,
credentials: google.auth.credentials.Credentials,
enable_tcp_keepalive: bool = False,
+ use_dns_endpoint: bool = False,
):
self._cluster_url = cluster_url
self._ssl_ca_cert = ssl_ca_cert
self._credentials = credentials
self.enable_tcp_keepalive = enable_tcp_keepalive
+ self.use_dns_endpoint = use_dns_endpoint
def get_conn(self) -> client.ApiClient:
configuration = self._get_config()
@@ -86,12 +88,13 @@ class GKEClusterConnection:
api_key_prefix={"authorization": "Bearer"},
api_key={"authorization": self._get_token(self._credentials)},
)
- configuration.ssl_ca_cert = FileOrData(
- {
- "certificate-authority-data": self._ssl_ca_cert,
- },
- file_key_name="certificate-authority",
- ).as_file()
+ if not self.use_dns_endpoint:
+ configuration.ssl_ca_cert = FileOrData(
+ {
+ "certificate-authority-data": self._ssl_ca_cert,
+ },
+ file_key_name="certificate-authority",
+ ).as_file()
return configuration
@staticmethod
@@ -417,6 +420,7 @@ class GKEKubernetesHook(GoogleBaseHook, KubernetesHook):
cluster_url: str,
ssl_ca_cert: str,
enable_tcp_keepalive: bool = False,
+ use_dns_endpoint: bool = False,
*args,
**kwargs,
):
@@ -424,6 +428,7 @@ class GKEKubernetesHook(GoogleBaseHook, KubernetesHook):
self._cluster_url = cluster_url
self._ssl_ca_cert = ssl_ca_cert
self.enable_tcp_keepalive = enable_tcp_keepalive
+ self.use_dns_endpoint = use_dns_endpoint
def get_conn(self) -> client.ApiClient:
return GKEClusterConnection(
@@ -431,6 +436,7 @@ class GKEKubernetesHook(GoogleBaseHook, KubernetesHook):
ssl_ca_cert=self._ssl_ca_cert,
credentials=self.get_credentials(),
enable_tcp_keepalive=self.enable_tcp_keepalive,
+ use_dns_endpoint=self.use_dns_endpoint,
).get_conn()
def apply_from_yaml_file(
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
index 5288c3b4f9f..9e6675ca7af 100644
---
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -89,6 +89,7 @@ class GKEClusterAuthDetails:
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param project_id: The Google Developers Console project id.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param cluster_hook: airflow hook for working with kubernetes cluster.
"""
@@ -97,11 +98,13 @@ class GKEClusterAuthDetails:
cluster_name: str,
project_id: str,
use_internal_ip: bool,
+ use_dns_endpoint: bool,
cluster_hook: GKEHook,
):
self.cluster_name = cluster_name
self.project_id = project_id
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.cluster_hook = cluster_hook
self._cluster_url: str
self._ssl_ca_cert: str
@@ -113,10 +116,14 @@ class GKEClusterAuthDetails:
project_id=self.project_id,
)
- if not self.use_internal_ip:
- self._cluster_url = f"https://{cluster.endpoint}"
- else:
+ if self.use_dns_endpoint:
+ self._cluster_url = (
+
f"https://{cluster.control_plane_endpoints_config.dns_endpoint_config.endpoint}"
+ )
+ elif self.use_internal_ip:
self._cluster_url =
f"https://{cluster.private_cluster_config.private_endpoint}"
+ else:
+ self._cluster_url = f"https://{cluster.endpoint}"
self._ssl_ca_cert = cluster.master_auth.cluster_ca_certificate
return self._cluster_url, self._ssl_ca_cert
@@ -130,6 +137,7 @@ class GKEOperatorMixin:
"location",
"cluster_name",
"use_internal_ip",
+ "use_dns_endpoint",
"project_id",
"gcp_conn_id",
"impersonation_chain",
@@ -151,6 +159,7 @@ class GKEOperatorMixin:
cluster_url=self.cluster_url,
ssl_ca_cert=self.ssl_ca_cert,
enable_tcp_keepalive=self.enable_tcp_keepalive,
+ use_dns_endpoint=self.use_dns_endpoint, # type:
ignore[attr-defined]
)
@cached_property
@@ -160,6 +169,7 @@ class GKEOperatorMixin:
cluster_name=self.cluster_name, # type: ignore[attr-defined]
project_id=self.project_id, # type: ignore[attr-defined]
use_internal_ip=self.use_internal_ip, # type: ignore[attr-defined]
+ use_dns_endpoint=self.use_dns_endpoint, # type:
ignore[attr-defined]
cluster_hook=self.cluster_hook,
)
return auth_details.fetch_cluster_info()
@@ -200,6 +210,7 @@ class GKEDeleteClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -225,6 +236,7 @@ class GKEDeleteClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
self,
location: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -241,6 +253,7 @@ class GKEDeleteClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
self.location = location
self.cluster_name = cluster_name or name
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
@@ -347,6 +360,7 @@ class GKECreateClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
:param location: The name of the Google Kubernetes Engine zone or region
in which the
cluster resides, e.g. 'us-central1-a'
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -375,6 +389,7 @@ class GKECreateClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
body: dict | Cluster,
location: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -387,6 +402,7 @@ class GKECreateClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
self.body = body
self.location = location
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.cluster_name = body.get("name") if isinstance(body, dict) else
getattr(body, "name", None)
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
@@ -519,6 +535,7 @@ class GKEStartKueueInsideClusterOperator(GKEOperatorMixin,
KubernetesInstallKueu
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -543,6 +560,7 @@ class GKEStartKueueInsideClusterOperator(GKEOperatorMixin,
KubernetesInstallKueu
location: str,
cluster_name: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -555,6 +573,7 @@ class GKEStartKueueInsideClusterOperator(GKEOperatorMixin,
KubernetesInstallKueu
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.impersonation_chain = impersonation_chain
def execute(self, context: Context):
@@ -592,6 +611,7 @@ class GKEStartPodOperator(GKEOperatorMixin,
KubernetesPodOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -628,6 +648,7 @@ class GKEStartPodOperator(GKEOperatorMixin,
KubernetesPodOperator):
location: str,
cluster_name: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -661,6 +682,7 @@ class GKEStartPodOperator(GKEOperatorMixin,
KubernetesPodOperator):
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.impersonation_chain = impersonation_chain
self._regional = regional
if is_delete_operator_pod is not None:
@@ -760,6 +782,7 @@ class GKEStartJobOperator(GKEOperatorMixin,
KubernetesJobOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -788,6 +811,7 @@ class GKEStartJobOperator(GKEOperatorMixin,
KubernetesJobOperator):
location: str,
cluster_name: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -804,6 +828,7 @@ class GKEStartJobOperator(GKEOperatorMixin,
KubernetesJobOperator):
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.impersonation_chain = impersonation_chain
# There is no need to manage the kube_config file, as it will be
generated automatically.
@@ -860,6 +885,7 @@ class GKEDescribeJobOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -883,6 +909,7 @@ class GKEDescribeJobOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
location: str,
cluster_name: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -898,6 +925,7 @@ class GKEDescribeJobOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.impersonation_chain = impersonation_chain
self.job: V1Job | None = None
@@ -928,6 +956,7 @@ class GKEListJobsOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -951,6 +980,7 @@ class GKEListJobsOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
location: str,
cluster_name: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -966,6 +996,7 @@ class GKEListJobsOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.impersonation_chain = impersonation_chain
self.namespace = namespace
self.do_xcom_push = do_xcom_push
@@ -1003,6 +1034,7 @@ class GKECreateCustomResourceOperator(GKEOperatorMixin,
KubernetesCreateResource
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -1025,6 +1057,7 @@ class GKECreateCustomResourceOperator(GKEOperatorMixin,
KubernetesCreateResource
location: str,
cluster_name: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -1038,6 +1071,7 @@ class GKECreateCustomResourceOperator(GKEOperatorMixin,
KubernetesCreateResource
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.impersonation_chain = impersonation_chain
if self.gcp_conn_id is None:
@@ -1073,6 +1107,7 @@ class GKEDeleteCustomResourceOperator(GKEOperatorMixin,
KubernetesDeleteResource
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -1095,6 +1130,7 @@ class GKEDeleteCustomResourceOperator(GKEOperatorMixin,
KubernetesDeleteResource
location: str,
cluster_name: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -1108,6 +1144,7 @@ class GKEDeleteCustomResourceOperator(GKEOperatorMixin,
KubernetesDeleteResource
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.impersonation_chain = impersonation_chain
if self.gcp_conn_id is None:
@@ -1132,6 +1169,7 @@ class GKEStartKueueJobOperator(GKEOperatorMixin,
KubernetesStartKueueJobOperator
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -1154,6 +1192,7 @@ class GKEStartKueueJobOperator(GKEOperatorMixin,
KubernetesStartKueueJobOperator
location: str,
cluster_name: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -1166,6 +1205,7 @@ class GKEStartKueueJobOperator(GKEOperatorMixin,
KubernetesStartKueueJobOperator
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.impersonation_chain = impersonation_chain
@@ -1192,6 +1232,7 @@ class GKEDeleteJobOperator(GKEOperatorMixin,
KubernetesDeleteJobOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -1214,6 +1255,7 @@ class GKEDeleteJobOperator(GKEOperatorMixin,
KubernetesDeleteJobOperator):
location: str,
cluster_name: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -1227,6 +1269,7 @@ class GKEDeleteJobOperator(GKEOperatorMixin,
KubernetesDeleteJobOperator):
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.impersonation_chain = impersonation_chain
if self.gcp_conn_id is None:
@@ -1255,6 +1298,7 @@ class GKESuspendJobOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -1278,6 +1322,7 @@ class GKESuspendJobOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
location: str,
cluster_name: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -1293,6 +1338,7 @@ class GKESuspendJobOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.impersonation_chain = impersonation_chain
self.job: V1Job | None = None
@@ -1326,6 +1372,7 @@ class GKEResumeJobOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -1349,6 +1396,7 @@ class GKEResumeJobOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
location: str,
cluster_name: str,
use_internal_ip: bool = False,
+ use_dns_endpoint: bool = False,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -1364,6 +1412,7 @@ class GKEResumeJobOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.use_internal_ip = use_internal_ip
+ self.use_dns_endpoint = use_dns_endpoint
self.impersonation_chain = impersonation_chain
self.job: V1Job | None = None
diff --git a/providers/google/src/airflow/providers/google/get_provider_info.py
b/providers/google/src/airflow/providers/google/get_provider_info.py
index 29dccb0845c..29e5899f1d4 100644
--- a/providers/google/src/airflow/providers/google/get_provider_info.py
+++ b/providers/google/src/airflow/providers/google/get_provider_info.py
@@ -1607,7 +1607,7 @@ def get_provider_info():
"google-cloud-bigtable>=2.17.0",
"google-cloud-build>=3.31.0",
"google-cloud-compute>=1.10.0",
- "google-cloud-container>=2.17.4",
+ "google-cloud-container>=2.52.0",
"google-cloud-datacatalog>=3.23.0",
"google-cloud-dataflow-client>=0.8.6",
"google-cloud-dataform>=0.5.0",
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
index 92311abd18f..d564d983a48 100644
---
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
+++
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
@@ -78,8 +78,10 @@ TEST_POLL_INTERVAL = 20.0
GKE_CLUSTER_NAME = "test-cluster-name"
GKE_CLUSTER_ENDPOINT = "test-host"
GKE_CLUSTER_PRIVATE_ENDPOINT = "test-private-host"
+GKE_CLUSTER_DNS_ENDPOINT = f"gke-dns-endpoint.{TEST_LOCATION}.gke.goog"
GKE_CLUSTER_URL = f"https://{GKE_CLUSTER_ENDPOINT}"
GKE_CLUSTER_PRIVATE_URL = f"https://{GKE_CLUSTER_PRIVATE_ENDPOINT}"
+GKE_CLUSTER_DNS_URL = f"https://{GKE_CLUSTER_DNS_ENDPOINT}"
GKE_SSL_CA_CERT = "TEST_SSL_CA_CERT_CONTENT"
GKE_CLUSTER_CREATE_BODY_DICT = {
@@ -102,16 +104,57 @@ GKE_OPERATORS_PATH =
"airflow.providers.google.cloud.operators.kubernetes_engine
class TestGKEClusterAuthDetails:
@pytest.mark.parametrize(
- "use_internal_ip, endpoint, private_endpoint, expected_cluster_url",
+ "use_dns_endpoint, use_internal_ip, endpoint, private_endpoint,
dns_endpoint, expected_cluster_url",
[
- (False, GKE_CLUSTER_ENDPOINT, GKE_CLUSTER_PRIVATE_ENDPOINT,
GKE_CLUSTER_URL),
- (True, GKE_CLUSTER_ENDPOINT, GKE_CLUSTER_PRIVATE_ENDPOINT,
GKE_CLUSTER_PRIVATE_URL),
+ (
+ False,
+ False,
+ GKE_CLUSTER_ENDPOINT,
+ GKE_CLUSTER_PRIVATE_ENDPOINT,
+ GKE_CLUSTER_DNS_ENDPOINT,
+ GKE_CLUSTER_URL,
+ ),
+ (
+ False,
+ True,
+ GKE_CLUSTER_ENDPOINT,
+ GKE_CLUSTER_PRIVATE_ENDPOINT,
+ GKE_CLUSTER_DNS_ENDPOINT,
+ GKE_CLUSTER_PRIVATE_URL,
+ ),
+ (
+ True,
+ False,
+ GKE_CLUSTER_ENDPOINT,
+ GKE_CLUSTER_PRIVATE_ENDPOINT,
+ GKE_CLUSTER_DNS_ENDPOINT,
+ GKE_CLUSTER_DNS_URL,
+ ),
+ (
+ True,
+ True,
+ GKE_CLUSTER_ENDPOINT,
+ GKE_CLUSTER_PRIVATE_ENDPOINT,
+ GKE_CLUSTER_DNS_ENDPOINT,
+ GKE_CLUSTER_DNS_URL,
+ ),
],
)
- def test_fetch_cluster_info(self, use_internal_ip, endpoint,
private_endpoint, expected_cluster_url):
+ def test_fetch_cluster_info(
+ self,
+ use_dns_endpoint,
+ use_internal_ip,
+ endpoint,
+ private_endpoint,
+ dns_endpoint,
+ expected_cluster_url,
+ ):
mock_cluster = mock.MagicMock(
endpoint=endpoint,
private_cluster_config=mock.MagicMock(private_endpoint=private_endpoint),
+ control_plane_endpoints_config=mock.MagicMock(
+ dns_endpoint_config=mock.MagicMock(endpoint=dns_endpoint)
+ ),
master_auth=mock.MagicMock(cluster_ca_certificate=GKE_SSL_CA_CERT),
)
mock_cluster_hook =
mock.MagicMock(get_cluster=mock.MagicMock(return_value=mock_cluster))
@@ -120,6 +163,7 @@ class TestGKEClusterAuthDetails:
cluster_name=GKE_CLUSTER_NAME,
project_id=TEST_PROJECT_ID,
use_internal_ip=use_internal_ip,
+ use_dns_endpoint=use_dns_endpoint,
cluster_hook=mock_cluster_hook,
)
@@ -140,12 +184,14 @@ class TestGKEOperatorMixin:
self.operator.impersonation_chain = TEST_IMPERSONATION_CHAIN
self.operator.project_id = TEST_PROJECT_ID
self.operator.use_internal_ip = False
+ self.operator.use_dns_endpoint = False
def test_template_fields(self):
expected_template_fields = {
"location",
"cluster_name",
"use_internal_ip",
+ "use_dns_endpoint",
"project_id",
"gcp_conn_id",
"impersonation_chain",
@@ -180,6 +226,7 @@ class TestGKEOperatorMixin:
cluster_url=GKE_CLUSTER_URL,
ssl_ca_cert=GKE_SSL_CA_CERT,
enable_tcp_keepalive=False,
+ use_dns_endpoint=False,
)
@mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
@@ -195,6 +242,7 @@ class TestGKEOperatorMixin:
cluster_name=self.operator.cluster_name,
project_id=self.operator.project_id,
use_internal_ip=self.operator.use_internal_ip,
+ use_dns_endpoint=self.operator.use_dns_endpoint,
cluster_hook=self.operator.cluster_hook,
)
mock_fetch_cluster_info.assert_called_once_with()