dabla commented on code in PR #60650:
URL: https://github.com/apache/airflow/pull/60650#discussion_r2709518476
##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -69,44 +69,51 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)
+ def _build_trigger_event(self, pipeline_status: str) -> TriggerEvent |
None:
+ """Build TriggerEvent based on pipeline status. Returns None if status
is not terminal."""
+ if pipeline_status == AzureDataFactoryPipelineRunStatus.FAILED:
+ return TriggerEvent({"status": "error", "message": f"Pipeline run
{self.run_id} has Failed."})
+ if pipeline_status == AzureDataFactoryPipelineRunStatus.CANCELLED:
+ return TriggerEvent(
+ {"status": "error", "message": f"Pipeline run {self.run_id}
has been Cancelled."}
+ )
+ if pipeline_status == AzureDataFactoryPipelineRunStatus.SUCCEEDED:
+ return TriggerEvent(
+ {"status": "success", "message": f"Pipeline run {self.run_id}
has been Succeeded."}
+ )
+ return None
+
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to Azure Data Factory, polls for the pipeline
run status."""
- hook =
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
executed_after_token_refresh = False
try:
- while True:
- try:
- pipeline_status = await hook.get_adf_pipeline_run_status(
- run_id=self.run_id,
- resource_group_name=self.resource_group_name,
- factory_name=self.factory_name,
- )
- executed_after_token_refresh = False
- if pipeline_status ==
AzureDataFactoryPipelineRunStatus.FAILED:
- yield TriggerEvent(
- {"status": "error", "message": f"Pipeline run
{self.run_id} has Failed."}
+ async with AzureDataFactoryAsyncHook(
+ azure_data_factory_conn_id=self.azure_data_factory_conn_id
+ ) as hook:
+ while True:
+ try:
+ pipeline_status = await
hook.get_adf_pipeline_run_status(
+ run_id=self.run_id,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
)
- return
- elif pipeline_status ==
AzureDataFactoryPipelineRunStatus.CANCELLED:
- msg = f"Pipeline run {self.run_id} has been Cancelled."
- yield TriggerEvent({"status": "error", "message": msg})
- return
- elif pipeline_status ==
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
- msg = f"Pipeline run {self.run_id} has been Succeeded."
- yield TriggerEvent({"status": "success", "message":
msg})
- return
- await asyncio.sleep(self.poke_interval)
- except ServiceRequestError:
- # conn might expire during long running pipeline.
- # If exception is caught, it tries to refresh connection
once.
- # If it still doesn't fix the issue,
- # than the execute_after_token_refresh would still be False
- # and an exception will be raised
- if executed_after_token_refresh:
- await hook.refresh_conn()
executed_after_token_refresh = False
- else:
- raise
+ event = self._build_trigger_event(pipeline_status)
+ if event:
+ yield event
+ return
+ await asyncio.sleep(self.poke_interval)
+ except ServiceRequestError:
+ # conn might expire during long running pipeline.
+ # If exception is caught, it tries to refresh
connection once.
+ # If it still doesn't fix the issue,
+ # than the execute_after_token_refresh would still be
False
+ # and an exception will be raised
+ if executed_after_token_refresh:
Review Comment:
Nit, but I would write it like this:
```
if not executed_after_token_refresh:
raise
await hook.refresh_conn()
executed_after_token_refresh = False
```
##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -160,84 +167,94 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)
+ def _build_trigger_event(self, pipeline_status: str) -> TriggerEvent |
None:
+ """Build TriggerEvent based on pipeline status. Returns None if status
is not terminal."""
+ if pipeline_status in AzureDataFactoryPipelineRunStatus.FAILURE_STATES:
+ return TriggerEvent(
+ {
+ "status": "error",
+ "message": f"The pipeline run {self.run_id} has
{pipeline_status}.",
+ "run_id": self.run_id,
+ }
+ )
+ if pipeline_status == AzureDataFactoryPipelineRunStatus.SUCCEEDED:
+ return TriggerEvent(
+ {
+ "status": "success",
+ "message": f"The pipeline run {self.run_id} has
{pipeline_status}.",
+ "run_id": self.run_id,
+ }
+ )
+ return None
+
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to Azure Data Factory, polls for the pipeline
run status."""
- hook =
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
- try:
- pipeline_status = await hook.get_adf_pipeline_run_status(
- run_id=self.run_id,
- resource_group_name=self.resource_group_name,
- factory_name=self.factory_name,
- )
- executed_after_token_refresh = True
- if self.wait_for_termination:
- while self.end_time > time.time():
+ async with AzureDataFactoryAsyncHook(
+ azure_data_factory_conn_id=self.azure_data_factory_conn_id
+ ) as hook:
+ try:
+ pipeline_status = await hook.get_adf_pipeline_run_status(
+ run_id=self.run_id,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
+ )
+ executed_after_token_refresh = True
+ if self.wait_for_termination:
+ while self.end_time > time.time():
+ try:
+ pipeline_status = await
hook.get_adf_pipeline_run_status(
+ run_id=self.run_id,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
+ )
+ executed_after_token_refresh = True
+ event = self._build_trigger_event(pipeline_status)
+ if event:
+ yield event
+ return
+ self.log.info(
+ "Sleeping for %s. The pipeline state is %s.",
+ self.check_interval,
+ pipeline_status,
+ )
+ await asyncio.sleep(self.check_interval)
+ except ServiceRequestError:
+ # conn might expire during long running pipeline.
+ # If exception is caught, it tries to refresh
connection once.
+ # If it still doesn't fix the issue,
+ # than the execute_after_token_refresh would still
be False
+ # and an exception will be raised
+ if executed_after_token_refresh:
Review Comment:
Same here:
```
if not executed_after_token_refresh:
raise
await hook.refresh_conn()
executed_after_token_refresh = False
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]