This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 84aa6539e6b Add DNS endpoint support for GKE Hooks and Operators 
(#48075)
84aa6539e6b is described below

commit 84aa6539e6bf7e315597148b1e88f8fdecac2a4b
Author: Maksim <[email protected]>
AuthorDate: Sat Mar 22 11:42:18 2025 -0700

    Add DNS endpoint support for GKE Hooks and Operators (#48075)
---
 generated/provider_dependencies.json               |  2 +-
 providers/google/README.rst                        |  2 +-
 .../docs/operators/cloud/kubernetes_engine.rst     |  5 ++
 providers/google/pyproject.toml                    |  2 +-
 .../google/cloud/hooks/kubernetes_engine.py        | 18 ++++---
 .../google/cloud/operators/kubernetes_engine.py    | 55 +++++++++++++++++++--
 .../airflow/providers/google/get_provider_info.py  |  2 +-
 .../cloud/operators/test_kubernetes_engine.py      | 56 ++++++++++++++++++++--
 8 files changed, 125 insertions(+), 17 deletions(-)

diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 33beef3a1c2..95e149de682 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -672,7 +672,7 @@
       "google-cloud-bigtable>=2.17.0",
       "google-cloud-build>=3.31.0",
       "google-cloud-compute>=1.10.0",
-      "google-cloud-container>=2.17.4",
+      "google-cloud-container>=2.52.0",
       "google-cloud-datacatalog>=3.23.0",
       "google-cloud-dataflow-client>=0.8.6",
       "google-cloud-dataform>=0.5.0",
diff --git a/providers/google/README.rst b/providers/google/README.rst
index 660bca2f438..e485ab507cb 100644
--- a/providers/google/README.rst
+++ b/providers/google/README.rst
@@ -83,7 +83,7 @@ PIP package                                 Version required
 ``google-cloud-bigtable``                   ``>=2.17.0``
 ``google-cloud-build``                      ``>=3.31.0``
 ``google-cloud-compute``                    ``>=1.10.0``
-``google-cloud-container``                  ``>=2.17.4``
+``google-cloud-container``                  ``>=2.52.0``
 ``google-cloud-datacatalog``                ``>=3.23.0``
 ``google-cloud-dataflow-client``            ``>=0.8.6``
 ``google-cloud-dataform``                   ``>=0.5.0``
diff --git a/providers/google/docs/operators/cloud/kubernetes_engine.rst 
b/providers/google/docs/operators/cloud/kubernetes_engine.rst
index d8e549fc986..87904b9eaae 100644
--- a/providers/google/docs/operators/cloud/kubernetes_engine.rst
+++ b/providers/google/docs/operators/cloud/kubernetes_engine.rst
@@ -150,6 +150,11 @@ Private clusters have two unique endpoint values: 
``privateEndpoint``, which is
 sets the external IP address as the endpoint by default. If you prefer to use 
the internal IP as the
 endpoint, you need to set ``use_internal_ip`` parameter to ``True``.
 
+Using with DNS endpoint cluster
+'''''''''''''''''''''''''''''''
+
+For running ``GKEStartPodOperator`` using DNS endpoint you need to set 
``use_dns_endpoint`` parameter to ``True``.
+
 Using with Autopilot (serverless) cluster
 '''''''''''''''''''''''''''''''''''''''''
 
diff --git a/providers/google/pyproject.toml b/providers/google/pyproject.toml
index 6353dab6ef0..597e585a7db 100644
--- a/providers/google/pyproject.toml
+++ b/providers/google/pyproject.toml
@@ -84,7 +84,7 @@ dependencies = [
     "google-cloud-bigtable>=2.17.0",
     "google-cloud-build>=3.31.0",
     "google-cloud-compute>=1.10.0",
-    "google-cloud-container>=2.17.4",
+    "google-cloud-container>=2.52.0",
     "google-cloud-datacatalog>=3.23.0",
     "google-cloud-dataflow-client>=0.8.6",
     "google-cloud-dataform>=0.5.0",
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py
 
b/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py
index 3b791c5359d..309eb45d2e6 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py
@@ -64,11 +64,13 @@ class GKEClusterConnection:
         ssl_ca_cert: str,
         credentials: google.auth.credentials.Credentials,
         enable_tcp_keepalive: bool = False,
+        use_dns_endpoint: bool = False,
     ):
         self._cluster_url = cluster_url
         self._ssl_ca_cert = ssl_ca_cert
         self._credentials = credentials
         self.enable_tcp_keepalive = enable_tcp_keepalive
+        self.use_dns_endpoint = use_dns_endpoint
 
     def get_conn(self) -> client.ApiClient:
         configuration = self._get_config()
@@ -86,12 +88,13 @@ class GKEClusterConnection:
             api_key_prefix={"authorization": "Bearer"},
             api_key={"authorization": self._get_token(self._credentials)},
         )
-        configuration.ssl_ca_cert = FileOrData(
-            {
-                "certificate-authority-data": self._ssl_ca_cert,
-            },
-            file_key_name="certificate-authority",
-        ).as_file()
+        if not self.use_dns_endpoint:
+            configuration.ssl_ca_cert = FileOrData(
+                {
+                    "certificate-authority-data": self._ssl_ca_cert,
+                },
+                file_key_name="certificate-authority",
+            ).as_file()
         return configuration
 
     @staticmethod
@@ -417,6 +420,7 @@ class GKEKubernetesHook(GoogleBaseHook, KubernetesHook):
         cluster_url: str,
         ssl_ca_cert: str,
         enable_tcp_keepalive: bool = False,
+        use_dns_endpoint: bool = False,
         *args,
         **kwargs,
     ):
@@ -424,6 +428,7 @@ class GKEKubernetesHook(GoogleBaseHook, KubernetesHook):
         self._cluster_url = cluster_url
         self._ssl_ca_cert = ssl_ca_cert
         self.enable_tcp_keepalive = enable_tcp_keepalive
+        self.use_dns_endpoint = use_dns_endpoint
 
     def get_conn(self) -> client.ApiClient:
         return GKEClusterConnection(
@@ -431,6 +436,7 @@ class GKEKubernetesHook(GoogleBaseHook, KubernetesHook):
             ssl_ca_cert=self._ssl_ca_cert,
             credentials=self.get_credentials(),
             enable_tcp_keepalive=self.enable_tcp_keepalive,
+            use_dns_endpoint=self.use_dns_endpoint,
         ).get_conn()
 
     def apply_from_yaml_file(
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
 
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
index 5288c3b4f9f..9e6675ca7af 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -89,6 +89,7 @@ class GKEClusterAuthDetails:
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param project_id: The Google Developers Console project id.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param cluster_hook: airflow hook for working with kubernetes cluster.
     """
 
@@ -97,11 +98,13 @@ class GKEClusterAuthDetails:
         cluster_name: str,
         project_id: str,
         use_internal_ip: bool,
+        use_dns_endpoint: bool,
         cluster_hook: GKEHook,
     ):
         self.cluster_name = cluster_name
         self.project_id = project_id
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.cluster_hook = cluster_hook
         self._cluster_url: str
         self._ssl_ca_cert: str
@@ -113,10 +116,14 @@ class GKEClusterAuthDetails:
             project_id=self.project_id,
         )
 
-        if not self.use_internal_ip:
-            self._cluster_url = f"https://{cluster.endpoint}";
-        else:
+        if self.use_dns_endpoint:
+            self._cluster_url = (
+                
f"https://{cluster.control_plane_endpoints_config.dns_endpoint_config.endpoint}";
+            )
+        elif self.use_internal_ip:
             self._cluster_url = 
f"https://{cluster.private_cluster_config.private_endpoint}";
+        else:
+            self._cluster_url = f"https://{cluster.endpoint}";
         self._ssl_ca_cert = cluster.master_auth.cluster_ca_certificate
         return self._cluster_url, self._ssl_ca_cert
 
@@ -130,6 +137,7 @@ class GKEOperatorMixin:
         "location",
         "cluster_name",
         "use_internal_ip",
+        "use_dns_endpoint",
         "project_id",
         "gcp_conn_id",
         "impersonation_chain",
@@ -151,6 +159,7 @@ class GKEOperatorMixin:
             cluster_url=self.cluster_url,
             ssl_ca_cert=self.ssl_ca_cert,
             enable_tcp_keepalive=self.enable_tcp_keepalive,
+            use_dns_endpoint=self.use_dns_endpoint,  # type: 
ignore[attr-defined]
         )
 
     @cached_property
@@ -160,6 +169,7 @@ class GKEOperatorMixin:
             cluster_name=self.cluster_name,  # type: ignore[attr-defined]
             project_id=self.project_id,  # type: ignore[attr-defined]
             use_internal_ip=self.use_internal_ip,  # type: ignore[attr-defined]
+            use_dns_endpoint=self.use_dns_endpoint,  # type: 
ignore[attr-defined]
             cluster_hook=self.cluster_hook,
         )
         return auth_details.fetch_cluster_info()
@@ -200,6 +210,7 @@ class GKEDeleteClusterOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         cluster resides, e.g. 'us-central1-a'
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -225,6 +236,7 @@ class GKEDeleteClusterOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         self,
         location: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -241,6 +253,7 @@ class GKEDeleteClusterOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         self.location = location
         self.cluster_name = cluster_name or name
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.project_id = project_id
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
@@ -347,6 +360,7 @@ class GKECreateClusterOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
     :param location: The name of the Google Kubernetes Engine zone or region 
in which the
         cluster resides, e.g. 'us-central1-a'
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -375,6 +389,7 @@ class GKECreateClusterOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         body: dict | Cluster,
         location: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -387,6 +402,7 @@ class GKECreateClusterOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         self.body = body
         self.location = location
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.cluster_name = body.get("name") if isinstance(body, dict) else 
getattr(body, "name", None)
         self.project_id = project_id
         self.gcp_conn_id = gcp_conn_id
@@ -519,6 +535,7 @@ class GKEStartKueueInsideClusterOperator(GKEOperatorMixin, 
KubernetesInstallKueu
         cluster resides, e.g. 'us-central1-a'
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -543,6 +560,7 @@ class GKEStartKueueInsideClusterOperator(GKEOperatorMixin, 
KubernetesInstallKueu
         location: str,
         cluster_name: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -555,6 +573,7 @@ class GKEStartKueueInsideClusterOperator(GKEOperatorMixin, 
KubernetesInstallKueu
         self.cluster_name = cluster_name
         self.gcp_conn_id = gcp_conn_id
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: Context):
@@ -592,6 +611,7 @@ class GKEStartPodOperator(GKEOperatorMixin, 
KubernetesPodOperator):
         cluster resides, e.g. 'us-central1-a'
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -628,6 +648,7 @@ class GKEStartPodOperator(GKEOperatorMixin, 
KubernetesPodOperator):
         location: str,
         cluster_name: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -661,6 +682,7 @@ class GKEStartPodOperator(GKEOperatorMixin, 
KubernetesPodOperator):
         self.cluster_name = cluster_name
         self.gcp_conn_id = gcp_conn_id
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.impersonation_chain = impersonation_chain
         self._regional = regional
         if is_delete_operator_pod is not None:
@@ -760,6 +782,7 @@ class GKEStartJobOperator(GKEOperatorMixin, 
KubernetesJobOperator):
         cluster resides, e.g. 'us-central1-a'
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -788,6 +811,7 @@ class GKEStartJobOperator(GKEOperatorMixin, 
KubernetesJobOperator):
         location: str,
         cluster_name: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -804,6 +828,7 @@ class GKEStartJobOperator(GKEOperatorMixin, 
KubernetesJobOperator):
         self.cluster_name = cluster_name
         self.gcp_conn_id = gcp_conn_id
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.impersonation_chain = impersonation_chain
 
         # There is no need to manage the kube_config file, as it will be 
generated automatically.
@@ -860,6 +885,7 @@ class GKEDescribeJobOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         cluster resides, e.g. 'us-central1-a'
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -883,6 +909,7 @@ class GKEDescribeJobOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         location: str,
         cluster_name: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -898,6 +925,7 @@ class GKEDescribeJobOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         self.cluster_name = cluster_name
         self.gcp_conn_id = gcp_conn_id
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.impersonation_chain = impersonation_chain
         self.job: V1Job | None = None
 
@@ -928,6 +956,7 @@ class GKEListJobsOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         cluster resides, e.g. 'us-central1-a'
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -951,6 +980,7 @@ class GKEListJobsOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         location: str,
         cluster_name: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -966,6 +996,7 @@ class GKEListJobsOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         self.cluster_name = cluster_name
         self.gcp_conn_id = gcp_conn_id
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.impersonation_chain = impersonation_chain
         self.namespace = namespace
         self.do_xcom_push = do_xcom_push
@@ -1003,6 +1034,7 @@ class GKECreateCustomResourceOperator(GKEOperatorMixin, 
KubernetesCreateResource
         cluster resides, e.g. 'us-central1-a'
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -1025,6 +1057,7 @@ class GKECreateCustomResourceOperator(GKEOperatorMixin, 
KubernetesCreateResource
         location: str,
         cluster_name: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -1038,6 +1071,7 @@ class GKECreateCustomResourceOperator(GKEOperatorMixin, 
KubernetesCreateResource
         self.cluster_name = cluster_name
         self.gcp_conn_id = gcp_conn_id
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.impersonation_chain = impersonation_chain
 
         if self.gcp_conn_id is None:
@@ -1073,6 +1107,7 @@ class GKEDeleteCustomResourceOperator(GKEOperatorMixin, 
KubernetesDeleteResource
         cluster resides, e.g. 'us-central1-a'
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -1095,6 +1130,7 @@ class GKEDeleteCustomResourceOperator(GKEOperatorMixin, 
KubernetesDeleteResource
         location: str,
         cluster_name: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -1108,6 +1144,7 @@ class GKEDeleteCustomResourceOperator(GKEOperatorMixin, 
KubernetesDeleteResource
         self.cluster_name = cluster_name
         self.gcp_conn_id = gcp_conn_id
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.impersonation_chain = impersonation_chain
 
         if self.gcp_conn_id is None:
@@ -1132,6 +1169,7 @@ class GKEStartKueueJobOperator(GKEOperatorMixin, 
KubernetesStartKueueJobOperator
         cluster resides, e.g. 'us-central1-a'
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -1154,6 +1192,7 @@ class GKEStartKueueJobOperator(GKEOperatorMixin, 
KubernetesStartKueueJobOperator
         location: str,
         cluster_name: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -1166,6 +1205,7 @@ class GKEStartKueueJobOperator(GKEOperatorMixin, 
KubernetesStartKueueJobOperator
         self.cluster_name = cluster_name
         self.gcp_conn_id = gcp_conn_id
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.impersonation_chain = impersonation_chain
 
 
@@ -1192,6 +1232,7 @@ class GKEDeleteJobOperator(GKEOperatorMixin, 
KubernetesDeleteJobOperator):
         cluster resides, e.g. 'us-central1-a'
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -1214,6 +1255,7 @@ class GKEDeleteJobOperator(GKEOperatorMixin, 
KubernetesDeleteJobOperator):
         location: str,
         cluster_name: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -1227,6 +1269,7 @@ class GKEDeleteJobOperator(GKEOperatorMixin, 
KubernetesDeleteJobOperator):
         self.cluster_name = cluster_name
         self.gcp_conn_id = gcp_conn_id
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.impersonation_chain = impersonation_chain
 
         if self.gcp_conn_id is None:
@@ -1255,6 +1298,7 @@ class GKESuspendJobOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         cluster resides, e.g. 'us-central1-a'
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -1278,6 +1322,7 @@ class GKESuspendJobOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         location: str,
         cluster_name: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -1293,6 +1338,7 @@ class GKESuspendJobOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         self.cluster_name = cluster_name
         self.gcp_conn_id = gcp_conn_id
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.impersonation_chain = impersonation_chain
         self.job: V1Job | None = None
 
@@ -1326,6 +1372,7 @@ class GKEResumeJobOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         cluster resides, e.g. 'us-central1-a'
     :param cluster_name: The name of the Google Kubernetes Engine cluster.
     :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param use_dns_endpoint: Use the DNS address as the endpoint.
     :param project_id: The Google Developers Console project id
     :param gcp_conn_id: The Google cloud connection id to use. This allows for
         users to specify a service account.
@@ -1349,6 +1396,7 @@ class GKEResumeJobOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         location: str,
         cluster_name: str,
         use_internal_ip: bool = False,
+        use_dns_endpoint: bool = False,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
@@ -1364,6 +1412,7 @@ class GKEResumeJobOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         self.cluster_name = cluster_name
         self.gcp_conn_id = gcp_conn_id
         self.use_internal_ip = use_internal_ip
+        self.use_dns_endpoint = use_dns_endpoint
         self.impersonation_chain = impersonation_chain
         self.job: V1Job | None = None
 
diff --git a/providers/google/src/airflow/providers/google/get_provider_info.py 
b/providers/google/src/airflow/providers/google/get_provider_info.py
index 29dccb0845c..29e5899f1d4 100644
--- a/providers/google/src/airflow/providers/google/get_provider_info.py
+++ b/providers/google/src/airflow/providers/google/get_provider_info.py
@@ -1607,7 +1607,7 @@ def get_provider_info():
             "google-cloud-bigtable>=2.17.0",
             "google-cloud-build>=3.31.0",
             "google-cloud-compute>=1.10.0",
-            "google-cloud-container>=2.17.4",
+            "google-cloud-container>=2.52.0",
             "google-cloud-datacatalog>=3.23.0",
             "google-cloud-dataflow-client>=0.8.6",
             "google-cloud-dataform>=0.5.0",
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py 
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
index 92311abd18f..d564d983a48 100644
--- 
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
+++ 
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
@@ -78,8 +78,10 @@ TEST_POLL_INTERVAL = 20.0
 GKE_CLUSTER_NAME = "test-cluster-name"
 GKE_CLUSTER_ENDPOINT = "test-host"
 GKE_CLUSTER_PRIVATE_ENDPOINT = "test-private-host"
+GKE_CLUSTER_DNS_ENDPOINT = f"gke-dns-endpoint.{TEST_LOCATION}.gke.goog"
 GKE_CLUSTER_URL = f"https://{GKE_CLUSTER_ENDPOINT}";
 GKE_CLUSTER_PRIVATE_URL = f"https://{GKE_CLUSTER_PRIVATE_ENDPOINT}";
+GKE_CLUSTER_DNS_URL = f"https://{GKE_CLUSTER_DNS_ENDPOINT}";
 GKE_SSL_CA_CERT = "TEST_SSL_CA_CERT_CONTENT"
 
 GKE_CLUSTER_CREATE_BODY_DICT = {
@@ -102,16 +104,57 @@ GKE_OPERATORS_PATH = 
"airflow.providers.google.cloud.operators.kubernetes_engine
 
 class TestGKEClusterAuthDetails:
     @pytest.mark.parametrize(
-        "use_internal_ip, endpoint, private_endpoint, expected_cluster_url",
+        "use_dns_endpoint, use_internal_ip, endpoint, private_endpoint, 
dns_endpoint, expected_cluster_url",
         [
-            (False, GKE_CLUSTER_ENDPOINT, GKE_CLUSTER_PRIVATE_ENDPOINT, 
GKE_CLUSTER_URL),
-            (True, GKE_CLUSTER_ENDPOINT, GKE_CLUSTER_PRIVATE_ENDPOINT, 
GKE_CLUSTER_PRIVATE_URL),
+            (
+                False,
+                False,
+                GKE_CLUSTER_ENDPOINT,
+                GKE_CLUSTER_PRIVATE_ENDPOINT,
+                GKE_CLUSTER_DNS_ENDPOINT,
+                GKE_CLUSTER_URL,
+            ),
+            (
+                False,
+                True,
+                GKE_CLUSTER_ENDPOINT,
+                GKE_CLUSTER_PRIVATE_ENDPOINT,
+                GKE_CLUSTER_DNS_ENDPOINT,
+                GKE_CLUSTER_PRIVATE_URL,
+            ),
+            (
+                True,
+                False,
+                GKE_CLUSTER_ENDPOINT,
+                GKE_CLUSTER_PRIVATE_ENDPOINT,
+                GKE_CLUSTER_DNS_ENDPOINT,
+                GKE_CLUSTER_DNS_URL,
+            ),
+            (
+                True,
+                True,
+                GKE_CLUSTER_ENDPOINT,
+                GKE_CLUSTER_PRIVATE_ENDPOINT,
+                GKE_CLUSTER_DNS_ENDPOINT,
+                GKE_CLUSTER_DNS_URL,
+            ),
         ],
     )
-    def test_fetch_cluster_info(self, use_internal_ip, endpoint, 
private_endpoint, expected_cluster_url):
+    def test_fetch_cluster_info(
+        self,
+        use_dns_endpoint,
+        use_internal_ip,
+        endpoint,
+        private_endpoint,
+        dns_endpoint,
+        expected_cluster_url,
+    ):
         mock_cluster = mock.MagicMock(
             endpoint=endpoint,
             
private_cluster_config=mock.MagicMock(private_endpoint=private_endpoint),
+            control_plane_endpoints_config=mock.MagicMock(
+                dns_endpoint_config=mock.MagicMock(endpoint=dns_endpoint)
+            ),
             master_auth=mock.MagicMock(cluster_ca_certificate=GKE_SSL_CA_CERT),
         )
         mock_cluster_hook = 
mock.MagicMock(get_cluster=mock.MagicMock(return_value=mock_cluster))
@@ -120,6 +163,7 @@ class TestGKEClusterAuthDetails:
             cluster_name=GKE_CLUSTER_NAME,
             project_id=TEST_PROJECT_ID,
             use_internal_ip=use_internal_ip,
+            use_dns_endpoint=use_dns_endpoint,
             cluster_hook=mock_cluster_hook,
         )
 
@@ -140,12 +184,14 @@ class TestGKEOperatorMixin:
         self.operator.impersonation_chain = TEST_IMPERSONATION_CHAIN
         self.operator.project_id = TEST_PROJECT_ID
         self.operator.use_internal_ip = False
+        self.operator.use_dns_endpoint = False
 
     def test_template_fields(self):
         expected_template_fields = {
             "location",
             "cluster_name",
             "use_internal_ip",
+            "use_dns_endpoint",
             "project_id",
             "gcp_conn_id",
             "impersonation_chain",
@@ -180,6 +226,7 @@ class TestGKEOperatorMixin:
             cluster_url=GKE_CLUSTER_URL,
             ssl_ca_cert=GKE_SSL_CA_CERT,
             enable_tcp_keepalive=False,
+            use_dns_endpoint=False,
         )
 
     @mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
@@ -195,6 +242,7 @@ class TestGKEOperatorMixin:
             cluster_name=self.operator.cluster_name,
             project_id=self.operator.project_id,
             use_internal_ip=self.operator.use_internal_ip,
+            use_dns_endpoint=self.operator.use_dns_endpoint,
             cluster_hook=self.operator.cluster_hook,
         )
         mock_fetch_cluster_info.assert_called_once_with()

Reply via email to