jaklan commented on code in PR #45634:
URL: https://github.com/apache/airflow/pull/45634#discussion_r1919154731


##########
providers/src/airflow/providers/dbt/cloud/hooks/dbt.py:
##########
@@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = 
None) -> Response:
         """
         return 
self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}")
 
+    @fallback_to_default_account
+    def get_job_by_name(
+        self, project_name: str, environment_name: str, job_name: str, 
account_id: int | None = None
+    ) -> dict:
+        """
+        Retrieve metadata for a specific job by combination of project, 
environment, and job name.
+
+        Raises AirflowException if the job is not found or cannot be uniquely 
identified by provided parameters.
+
+        :param project_name: The name of a dbt Cloud project.
+        :param environment_name: The name of a dbt Cloud environment.
+        :param job_name: The name of a dbt Cloud job.
+        :param account_id: Optional. The ID of a dbt Cloud account.
+        :return: The details of a job.
+        """
+        # get project_id using project_name
+        projects = self.list_projects(name_contains=project_name, 
account_id=account_id)
+        # flatten & filter the list of responses
+        projects = [
+            project
+            for response in projects
+            for project in response.json()["data"]
+            if project["name"] == project_name
+        ]
+        if len(projects) != 1:
+            raise AirflowException(f"Found {len(projects)} projects with name 
`{project_name}`.")
+        project_id = projects[0]["id"]
+
+        # get environment_id using project_id and environment_name
+        environments = self.list_environments(
+            project_id=project_id, name_contains=environment_name, 
account_id=account_id
+        )
+        # flatten & filter the list of responses
+        environments = [
+            env
+            for response in environments

Review Comment:
   Same as above



##########
providers/src/airflow/providers/dbt/cloud/hooks/dbt.py:
##########
@@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = 
None) -> Response:
         """
         return 
self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}")
 
+    @fallback_to_default_account
+    def get_job_by_name(
+        self, project_name: str, environment_name: str, job_name: str, 
account_id: int | None = None
+    ) -> dict:
+        """
+        Retrieve metadata for a specific job by combination of project, 
environment, and job name.
+
+        Raises AirflowException if the job is not found or cannot be uniquely 
identified by provided parameters.
+
+        :param project_name: The name of a dbt Cloud project.
+        :param environment_name: The name of a dbt Cloud environment.
+        :param job_name: The name of a dbt Cloud job.
+        :param account_id: Optional. The ID of a dbt Cloud account.
+        :return: The details of a job.
+        """
+        # get project_id using project_name
+        projects = self.list_projects(name_contains=project_name, 
account_id=account_id)
+        # flatten & filter the list of responses
+        projects = [
+            project
+            for response in projects
+            for project in response.json()["data"]
+            if project["name"] == project_name
+        ]
+        if len(projects) != 1:
+            raise AirflowException(f"Found {len(projects)} projects with name 
`{project_name}`.")
+        project_id = projects[0]["id"]
+
+        # get environment_id using project_id and environment_name
+        environments = self.list_environments(
+            project_id=project_id, name_contains=environment_name, 
account_id=account_id
+        )
+        # flatten & filter the list of responses
+        environments = [
+            env
+            for response in environments
+            for env in response.json()["data"]
+            if env["name"] == environment_name
+        ]
+        if len(environments) != 1:
+            raise AirflowException(
+                f"Found {len(environments)} environments with name 
`{environment_name}` in project `{project_name}`."
+            )
+        environment_id = environments[0]["id"]
+
+        # get job using project_id, environment_id and job_name
+        list_jobs_responses = self.list_jobs(
+            project_id=project_id,
+            environment_id=environment_id,
+            name_contains=job_name,
+            account_id=account_id,
+        )
+        # flatten & filter the list of responses
+        jobs = [
+            job
+            for response in list_jobs_responses
+            for job in response.json()["data"]
+            if job["name"] == job_name
+        ]
+        if len(jobs) != 1:
+            raise AirflowException(
+                f"Found {len(jobs)} jobs with name `{job_name}` in project 
`{project_name}` and environment `{environment_name}`."

Review Comment:
   ```
   f"Found {len(jobs)} jobs with name `{job_name}` in environment 
`{environment_name}` in project `{project_name}`."
   ```



##########
providers/src/airflow/providers/dbt/cloud/hooks/dbt.py:
##########
@@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = 
None) -> Response:
         """
         return 
self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}")
 
+    @fallback_to_default_account
+    def get_job_by_name(
+        self, project_name: str, environment_name: str, job_name: str, 
account_id: int | None = None
+    ) -> dict:
+        """
+        Retrieve metadata for a specific job by combination of project, 
environment, and job name.
+
+        Raises AirflowException if the job is not found or cannot be uniquely 
identified by provided parameters.
+
+        :param project_name: The name of a dbt Cloud project.
+        :param environment_name: The name of a dbt Cloud environment.
+        :param job_name: The name of a dbt Cloud job.
+        :param account_id: Optional. The ID of a dbt Cloud account.
+        :return: The details of a job.
+        """
+        # get project_id using project_name
+        projects = self.list_projects(name_contains=project_name, 
account_id=account_id)
+        # flatten & filter the list of responses
+        projects = [
+            project
+            for response in projects

Review Comment:
   `for response in responses`



##########
providers/src/airflow/providers/dbt/cloud/hooks/dbt.py:
##########
@@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = 
None) -> Response:
         """
         return 
self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}")
 
+    @fallback_to_default_account
+    def get_job_by_name(
+        self, project_name: str, environment_name: str, job_name: str, 
account_id: int | None = None
+    ) -> dict:
+        """
+        Retrieve metadata for a specific job by combination of project, 
environment, and job name.
+
+        Raises AirflowException if the job is not found or cannot be uniquely 
identified by provided parameters.
+
+        :param project_name: The name of a dbt Cloud project.
+        :param environment_name: The name of a dbt Cloud environment.
+        :param job_name: The name of a dbt Cloud job.
+        :param account_id: Optional. The ID of a dbt Cloud account.
+        :return: The details of a job.
+        """
+        # get project_id using project_name
+        projects = self.list_projects(name_contains=project_name, 
account_id=account_id)

Review Comment:
   `responses = ...`



##########
providers/src/airflow/providers/dbt/cloud/hooks/dbt.py:
##########
@@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = 
None) -> Response:
         """
         return 
self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}")
 
+    @fallback_to_default_account
+    def get_job_by_name(
+        self, project_name: str, environment_name: str, job_name: str, 
account_id: int | None = None
+    ) -> dict:
+        """
+        Retrieve metadata for a specific job by combination of project, 
environment, and job name.
+
+        Raises AirflowException if the job is not found or cannot be uniquely 
identified by provided parameters.
+
+        :param project_name: The name of a dbt Cloud project.
+        :param environment_name: The name of a dbt Cloud environment.
+        :param job_name: The name of a dbt Cloud job.
+        :param account_id: Optional. The ID of a dbt Cloud account.
+        :return: The details of a job.
+        """
+        # get project_id using project_name
+        projects = self.list_projects(name_contains=project_name, 
account_id=account_id)
+        # flatten & filter the list of responses
+        projects = [
+            project
+            for response in projects
+            for project in response.json()["data"]
+            if project["name"] == project_name
+        ]
+        if len(projects) != 1:
+            raise AirflowException(f"Found {len(projects)} projects with name 
`{project_name}`.")
+        project_id = projects[0]["id"]
+
+        # get environment_id using project_id and environment_name
+        environments = self.list_environments(
+            project_id=project_id, name_contains=environment_name, 
account_id=account_id
+        )
+        # flatten & filter the list of responses
+        environments = [
+            env
+            for response in environments
+            for env in response.json()["data"]
+            if env["name"] == environment_name
+        ]
+        if len(environments) != 1:
+            raise AirflowException(
+                f"Found {len(environments)} environments with name 
`{environment_name}` in project `{project_name}`."
+            )
+        environment_id = environments[0]["id"]
+
+        # get job using project_id, environment_id and job_name
+        list_jobs_responses = self.list_jobs(
+            project_id=project_id,
+            environment_id=environment_id,
+            name_contains=job_name,
+            account_id=account_id,
+        )
+        # flatten & filter the list of responses
+        jobs = [
+            job
+            for response in list_jobs_responses

Review Comment:
   Same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to