[GitHub] [airflow] phanikumv commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

2023-02-22 Thread via GitHub


phanikumv commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1115301175


##
airflow/providers/dbt/cloud/sensors/dbt.py:
##
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
 raise DbtCloudJobRunException(f"Job run {self.run_id} has been 
cancelled.")
 
 return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+"""
+Checks the status of a dbt Cloud job run asynchronously.
+
+.. seealso::
+For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a 
look at the guide::
+:ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+:param dbt_cloud_conn_id: The connection identifier for connecting to dbt 
Cloud.
+:param run_id: The job run identifier.
+:param account_id: The dbt Cloud account identifier.
+:param timeout: Time in seconds to wait for a job run to reach a terminal 
status. Defaults to 7 days.
+"""
+
+def __init__(
+self,
+*,
+poll_interval: float = 5,
+timeout: float = 60 * 60 * 24 * 7,
+**kwargs: Any,
+):
+self.poll_interval = poll_interval
+self.timeout = timeout
+super().__init__(**kwargs)
+
+def execute(self, context: Context) -> None:
+"""Defers to Trigger class to poll for state of the job run until
+it reaches a failure state or success state"""
+end_time = time.time() + self.timeout
+self.defer(
+timeout=self.execution_timeout,
+trigger=DbtCloudRunJobTrigger(
+run_id=self.run_id,
+conn_id=self.dbt_cloud_conn_id,
+account_id=self.account_id,
+poll_interval=self.poll_interval,
+end_time=end_time,
+),
+method_name="execute_complete",
+)
+
+def execute_complete(self, context: Context, event: Dict[str, Any]) -> int:
+"""
+Callback for when the trigger fires - returns immediately.
+Relies on trigger to throw an exception, otherwise it assumes 
execution was
+successful.
+"""
+if event["status"] in ["error", "cancelled"]:
+raise AirflowException(event["message"])

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] phanikumv commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

2023-02-22 Thread via GitHub


phanikumv commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1115300047


##
airflow/providers/dbt/cloud/sensors/dbt.py:
##
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
 raise DbtCloudJobRunException(f"Job run {self.run_id} has been 
cancelled.")
 
 return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+"""
+Checks the status of a dbt Cloud job run asynchronously.
+
+.. seealso::
+For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a 
look at the guide::
+:ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+:param dbt_cloud_conn_id: The connection identifier for connecting to dbt 
Cloud.
+:param run_id: The job run identifier.
+:param account_id: The dbt Cloud account identifier.
+:param timeout: Time in seconds to wait for a job run to reach a terminal 
status. Defaults to 7 days.
+"""
+
+def __init__(
+self,
+*,
+poll_interval: float = 5,
+timeout: float = 60 * 60 * 24 * 7,
+**kwargs: Any,
+):
+self.poll_interval = poll_interval
+self.timeout = timeout
+super().__init__(**kwargs)
+
+def execute(self, context: Context) -> None:
+"""Defers to Trigger class to poll for state of the job run until
+it reaches a failure state or success state"""

Review Comment:
   fixed



##
airflow/providers/dbt/cloud/sensors/dbt.py:
##
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
 raise DbtCloudJobRunException(f"Job run {self.run_id} has been 
cancelled.")
 
 return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+"""
+Checks the status of a dbt Cloud job run asynchronously.
+
+.. seealso::
+For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a 
look at the guide::
+:ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+:param dbt_cloud_conn_id: The connection identifier for connecting to dbt 
Cloud.
+:param run_id: The job run identifier.
+:param account_id: The dbt Cloud account identifier.
+:param timeout: Time in seconds to wait for a job run to reach a terminal 
status. Defaults to 7 days.
+"""
+
+def __init__(
+self,
+*,
+poll_interval: float = 5,
+timeout: float = 60 * 60 * 24 * 7,
+**kwargs: Any,
+):
+self.poll_interval = poll_interval
+self.timeout = timeout
+super().__init__(**kwargs)
+
+def execute(self, context: Context) -> None:
+"""Defers to Trigger class to poll for state of the job run until
+it reaches a failure state or success state"""
+end_time = time.time() + self.timeout
+self.defer(
+timeout=self.execution_timeout,
+trigger=DbtCloudRunJobTrigger(
+run_id=self.run_id,
+conn_id=self.dbt_cloud_conn_id,
+account_id=self.account_id,
+poll_interval=self.poll_interval,
+end_time=end_time,
+),
+method_name="execute_complete",
+)
+
+def execute_complete(self, context: Context, event: Dict[str, Any]) -> int:

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] phanikumv commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

2023-02-22 Thread via GitHub


phanikumv commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1115299464


##
airflow/providers/dbt/cloud/sensors/dbt.py:
##
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
 raise DbtCloudJobRunException(f"Job run {self.run_id} has been 
cancelled.")
 
 return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+"""
+Checks the status of a dbt Cloud job run asynchronously.
+
+.. seealso::
+For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a 
look at the guide::
+:ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+:param dbt_cloud_conn_id: The connection identifier for connecting to dbt 
Cloud.
+:param run_id: The job run identifier.
+:param account_id: The dbt Cloud account identifier.
+:param timeout: Time in seconds to wait for a job run to reach a terminal 
status. Defaults to 7 days.
+"""
+
+def __init__(
+self,
+*,
+poll_interval: float = 5,

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] phanikumv commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

2023-02-22 Thread via GitHub


phanikumv commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1115297396


##
airflow/providers/dbt/cloud/sensors/dbt.py:
##
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
 raise DbtCloudJobRunException(f"Job run {self.run_id} has been 
cancelled.")
 
 return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+"""
+Checks the status of a dbt Cloud job run asynchronously.
+
+.. seealso::
+For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a 
look at the guide::

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org