jason810496 commented on code in PR #48466:
URL: https://github.com/apache/airflow/pull/48466#discussion_r2024215994


##########
providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py:
##########
@@ -213,7 +213,12 @@ def execute(self, context: Context):
                 ):
                     self.log.info("Job run %s has completed successfully.", 
self.run_id)
                 else:
-                    raise DbtCloudJobRunException(f"Job run {self.run_id} has 
failed or has been cancelled.")
+                    # Gather job run details (including run steps), so we can 
output the logs from each step
+                    run_details = self.hook.get_job_run(
+                        run_id=self.run_id, account_id=self.account_id, 
include_related=["run_steps"]
+                    ).json()["data"]
+
+                    raise DbtCloudJobRunDetailsException(self.run_id, 
run_details)

Review Comment:
   We can modularize this part as _raise_dbt_job_run_detail_exception method.



##########
providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py:
##########
@@ -135,6 +135,63 @@ class DbtCloudJobRunException(AirflowException):
     """An exception that indicates a job run failed to complete."""
 
 
+class DbtCloudJobRunDetailsException(AirflowException):
+    """
+    An exception that indicates a job run failed to complete.
+
+    Provides additional job run details.
+    """
+
+    def __init__(self, run_id: int, run_details: dict):
+        self.run_id = run_id
+        self.run_details = run_details
+
+    def __str__(self):
+        # For extracting only a subset of the higher level job details
+        # id is the run_id
+        run_data_keys = [
+            "id",
+            "job_id",
+            "is_error",
+            "dbt_version",
+            "finished_at",
+            "finished_at_humanized",
+            "run_duration",
+            "run_duration_humanized",
+        ]
+        # For extracting only a subset of the run step details
+        # id is the step id
+        run_steps_keys = [
+            "run_id",
+            "id",
+            "index",
+            "name",
+            "status",
+            "status_humanized",
+            "duration",
+            "duration_humanized",
+            "logs",
+            "debug_logs",
+        ]

Review Comment:
   Nit: How about make these list for filtering keys as properties of the 
exception class? 



##########
providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py:
##########
@@ -239,7 +244,13 @@ def execute(self, context: Context):
                     DbtCloudJobRunStatus.CANCELLED.value,
                     DbtCloudJobRunStatus.ERROR.value,
                 ):
-                    raise DbtCloudJobRunException(f"Job run {self.run_id} has 
failed or has been cancelled.")
+                    # Gather job run details (including run steps), so we can 
output the logs from each step
+                    run_details = self.hook.get_job_run(
+                        run_id=self.run_id, account_id=self.account_id, 
include_related=["run_steps"]
+                    ).json()["data"]
+
+                    raise DbtCloudJobRunDetailsException(self.run_id, 
run_details)
+

Review Comment:
   Then the method can be reused here and several parts below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to