Ohashiro commented on issue #44618:
URL: https://github.com/apache/airflow/issues/44618#issuecomment-2575249301
Hi @dabla
I quickly tested what we said, so that the triggers return to
`execute_complete` method, which itself checks the message and if it detects
that there was an issue fetching the refresh Id, it redirects to a
`retry_execution` method that counts the retries and re-executes the `execute`
method. Of course the code is not very clean, but here is an overview:
```python
def execute(self, context: Context):
"""Refresh the Power BI Dataset."""
if self.wait_for_termination:
self.defer(
trigger=PowerBITrigger(
conn_id=self.conn_id,
group_id=self.group_id,
dataset_id=self.dataset_id,
timeout=self.timeout,
proxies=self.proxies,
api_version=self.api_version,
check_interval=self.check_interval,
wait_for_termination=self.wait_for_termination,
),
method_name=self.get_refresh_status.__name__,
)
def get_refresh_status(self, context: Context, event: dict[str, str] | None
= None):
"""Push the refresh Id to XCom then runs the Triggers to wait for
refresh completion."""
if event:
if event["status"] == "error" and "Unable to fetch the details of
dataset refresh with Request Id" not in event["message"] and "not found" not in
event["message"]:
raise AirflowException(event["message"])
self.xcom_push(context=context, key="powerbi_dataset_refresh_Id",
value=event["dataset_refresh_id"])
dataset_refresh_id = self.xcom_pull(context=context,
key="powerbi_dataset_refresh_Id")
if dataset_refresh_id:
self.defer(
trigger=PowerBITrigger(
conn_id=self.conn_id,
group_id=self.group_id,
dataset_id=self.dataset_id,
dataset_refresh_id=dataset_refresh_id,
timeout=self.timeout,
proxies=self.proxies,
api_version=self.api_version,
check_interval=self.check_interval,
wait_for_termination=self.execute_complete,
),
method_name=self.execute_complete.__name__,
)
def retry_execution(self, context: Context):
retries = self.xcom_pull(context=context, key="retries")
if retries and retries >= self.max_retries:
raise AirflowException("Max number of retries reached!")
if not retries:
retries = 0
self.xcom_push(context=context, key="retries", value=retries+1)
self.get_refresh_status(context)
def execute_complete(self, context: Context, event: dict[str, str]) -> Any:
"""
Return immediately - callback for when the trigger fires.
Relies on trigger to throw an exception, otherwise it assumes execution
was successful.
"""
if event:
if event["status"] == "error":
if "Unable to fetch the details of dataset refresh with Request
Id" in event["message"] or "not found" in event["message"]:
self.retry_execution(context)
else:
raise AirflowException(event["message"])
self.xcom_push(context=context,
key="powerbi_dataset_refresh_status", value=event["status"])
```
Note: in addition to these changes, we should add a new way to handle the
refresh cancellation. By default, if the trigger encounters an exception, it
cancels the refresh (which is not compatible with the retry made by the
operator). If we keep this solution, we have to change this behavior.
I think this solution can work but might add a little too much complexity to
the operator compared to a simple retry, though I think that this separation
between the trigger refresh and the status fetch is nice.
What's your opinion?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]