VladaZakharova commented on code in PR #37598:
URL: https://github.com/apache/airflow/pull/37598#discussion_r1499135451


##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -898,3 +899,232 @@ def execute(self, context: Context):
         ).fetch_cluster_info()
 
         return super().execute(context)
+
+
+class GKEDescribeJobOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve information about Job by given name.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GKEDescribeJobOperator`
+
+    :param job_name: The name of the resource to delete, in this case cluster 
name.
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region 
in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "job_name",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineJobLink(),)
+
+    def __init__(
+        self,
+        *,
+        job_name: str,
+        location: str,
+        namespace: str,
+        cluster_name: str,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.job_name = job_name
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self.job: V1Job | None = None
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to 
use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty 
connection "
+                "called `google_cloud_default`.",
+            )
+
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    @cached_property
+    def hook(self) -> GKEJobHook:
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using 
self.hook method. "
+                "Try to use self.get_kube_creds method",
+            )
+
+        hook = GKEJobHook(
+            gcp_conn_id=self.gcp_conn_id,
+            cluster_url=self._cluster_url,
+            ssl_ca_cert=self._ssl_ca_cert,
+        )
+        return hook
+
+    def execute(self, context: Context) -> None:
+        self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+            use_internal_ip=self.use_internal_ip,
+            cluster_hook=self.cluster_hook,
+        ).fetch_cluster_info()
+
+        self.job = self.hook.get_job(job_name=self.job_name, 
namespace=self.namespace)
+        self.log.info(
+            "Retrieved description of Job %s from cluster %s:\n %s",
+            self.job_name,
+            self.cluster_name,
+            self.job,
+        )
+        ti = context["ti"]
+        ti.xcom_push(key="job_name", value=self.job.metadata.name)
+        ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
+        KubernetesEngineJobLink.persist(context=context, task_instance=self)
+        return None
+
+
+class GKEListJobsOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve list of Jobs.
+
+    If namespace parameter is specified, the list of Jobs from dedicated
+    namespace will be retrieved. If no namespace specified, it will output 
Jobs from all namespaces.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GKEListJobsOperator`
+
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region 
in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineWorkloadsLink(),)
+
+    def __init__(
+        self,
+        *,
+        location: str,
+        cluster_name: str,
+        namespace: str | None = None,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to 
use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty 
connection "
+                "called `google_cloud_default`.",
+            )
+
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    @cached_property
+    def hook(self) -> GKEJobHook:
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using 
self.hook method. "
+                "Try to use self.get_kube_creds method",
+            )

Review Comment:
   good idea, thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to