jason810496 commented on code in PR #48466: URL: https://github.com/apache/airflow/pull/48466#discussion_r2024215994
########## providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py: ########## @@ -213,7 +213,12 @@ def execute(self, context: Context): ): self.log.info("Job run %s has completed successfully.", self.run_id) else: - raise DbtCloudJobRunException(f"Job run {self.run_id} has failed or has been cancelled.") + # Gather job run details (including run steps), so we can output the logs from each step + run_details = self.hook.get_job_run( + run_id=self.run_id, account_id=self.account_id, include_related=["run_steps"] + ).json()["data"] + + raise DbtCloudJobRunDetailsException(self.run_id, run_details) Review Comment: We can modularize this part as _raise_dbt_job_run_detail_exception method. ########## providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py: ########## @@ -135,6 +135,63 @@ class DbtCloudJobRunException(AirflowException): """An exception that indicates a job run failed to complete.""" +class DbtCloudJobRunDetailsException(AirflowException): + """ + An exception that indicates a job run failed to complete. + + Provides additional job run details. + """ + + def __init__(self, run_id: int, run_details: dict): + self.run_id = run_id + self.run_details = run_details + + def __str__(self): + # For extracting only a subset of the higher level job details + # id is the run_id + run_data_keys = [ + "id", + "job_id", + "is_error", + "dbt_version", + "finished_at", + "finished_at_humanized", + "run_duration", + "run_duration_humanized", + ] + # For extracting only a subset of the run step details + # id is the step id + run_steps_keys = [ + "run_id", + "id", + "index", + "name", + "status", + "status_humanized", + "duration", + "duration_humanized", + "logs", + "debug_logs", + ] Review Comment: Nit: How about make these list for filtering keys as properties of the exception class? ########## providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py: ########## @@ -239,7 +244,13 @@ def execute(self, context: Context): DbtCloudJobRunStatus.CANCELLED.value, DbtCloudJobRunStatus.ERROR.value, ): - raise DbtCloudJobRunException(f"Job run {self.run_id} has failed or has been cancelled.") + # Gather job run details (including run steps), so we can output the logs from each step + run_details = self.hook.get_job_run( + run_id=self.run_id, account_id=self.account_id, include_related=["run_steps"] + ).json()["data"] + + raise DbtCloudJobRunDetailsException(self.run_id, run_details) + Review Comment: Then the method can be reused here and several parts below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org