dabla commented on code in PR #60650:
URL: https://github.com/apache/airflow/pull/60650#discussion_r2704780853
##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -74,39 +74,40 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
hook =
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
executed_after_token_refresh = False
try:
- while True:
- try:
- pipeline_status = await hook.get_adf_pipeline_run_status(
- run_id=self.run_id,
- resource_group_name=self.resource_group_name,
- factory_name=self.factory_name,
- )
- executed_after_token_refresh = False
- if pipeline_status ==
AzureDataFactoryPipelineRunStatus.FAILED:
- yield TriggerEvent(
- {"status": "error", "message": f"Pipeline run
{self.run_id} has Failed."}
+ async with hook:
+ while True:
+ try:
+ pipeline_status = await
hook.get_adf_pipeline_run_status(
+ run_id=self.run_id,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
)
- return
- elif pipeline_status ==
AzureDataFactoryPipelineRunStatus.CANCELLED:
- msg = f"Pipeline run {self.run_id} has been Cancelled."
- yield TriggerEvent({"status": "error", "message": msg})
- return
- elif pipeline_status ==
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
- msg = f"Pipeline run {self.run_id} has been Succeeded."
- yield TriggerEvent({"status": "success", "message":
msg})
- return
- await asyncio.sleep(self.poke_interval)
- except ServiceRequestError:
- # conn might expire during long running pipeline.
- # If exception is caught, it tries to refresh connection
once.
- # If it still doesn't fix the issue,
- # than the execute_after_token_refresh would still be False
- # and an exception will be raised
- if executed_after_token_refresh:
- await hook.refresh_conn()
executed_after_token_refresh = False
- else:
- raise
+ if pipeline_status ==
AzureDataFactoryPipelineRunStatus.FAILED:
Review Comment:
Maybe write a helper method which constructs the TriggerEvent base on the
pipeline_status so then you could yield the resulting event from it and
encapsulating the if/else in a dedicated method.
```
def trigger_event_from_pipline_status(self, pipeline_status:
AzureDataFactoryPipelineRunStatus) -> TriggerEvent:
if pipeline_status == AzureDataFactoryPipelineRunStatus.FAILED:
return TriggerEvent(
{"status": "error", "message": f"Pipeline run {self.run_id} has
Failed."}
)
if pipeline_status == AzureDataFactoryPipelineRunStatus.CANCELLED:
msg = f"Pipeline run {self.run_id} has been Cancelled."
return TriggerEvent({"status": "error", "message": msg})
if pipeline_status == AzureDataFactoryPipelineRunStatus.SUCCEEDED:
msg = f"Pipeline run {self.run_id} has been Succeeded."
return TriggerEvent({"status": "success", "message": msg})
...
event = self.trigger_event_from_pipline_status(pipeline_status)
yield event
```
##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -164,70 +165,73 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to Azure Data Factory, polls for the pipeline
run status."""
hook =
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
try:
- pipeline_status = await hook.get_adf_pipeline_run_status(
- run_id=self.run_id,
- resource_group_name=self.resource_group_name,
- factory_name=self.factory_name,
- )
- executed_after_token_refresh = True
- if self.wait_for_termination:
- while self.end_time > time.time():
- try:
- pipeline_status = await
hook.get_adf_pipeline_run_status(
- run_id=self.run_id,
- resource_group_name=self.resource_group_name,
- factory_name=self.factory_name,
- )
- executed_after_token_refresh = True
- if pipeline_status in
AzureDataFactoryPipelineRunStatus.FAILURE_STATES:
- yield TriggerEvent(
- {
- "status": "error",
- "message": f"The pipeline run
{self.run_id} has {pipeline_status}.",
- "run_id": self.run_id,
- }
+ async with hook:
Review Comment:
Same here:
`with
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
as hook:`
##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -240,4 +244,6 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
)
except Exception:
self.log.exception("Failed to cancel pipeline run %s",
self.run_id)
+ finally:
+ await hook.close()
Review Comment:
Do we still need to close when we use the async context manager? I would
expect this now the handled by the context manager?
##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -74,39 +74,40 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
hook =
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
executed_after_token_refresh = False
try:
- while True:
- try:
- pipeline_status = await hook.get_adf_pipeline_run_status(
- run_id=self.run_id,
- resource_group_name=self.resource_group_name,
- factory_name=self.factory_name,
- )
- executed_after_token_refresh = False
- if pipeline_status ==
AzureDataFactoryPipelineRunStatus.FAILED:
- yield TriggerEvent(
- {"status": "error", "message": f"Pipeline run
{self.run_id} has Failed."}
+ async with hook:
Review Comment:
wouldn't it be nicer to write this like:
`with
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
as hook:`
##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -164,70 +165,73 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to Azure Data Factory, polls for the pipeline
run status."""
hook =
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
try:
- pipeline_status = await hook.get_adf_pipeline_run_status(
- run_id=self.run_id,
- resource_group_name=self.resource_group_name,
- factory_name=self.factory_name,
- )
- executed_after_token_refresh = True
- if self.wait_for_termination:
- while self.end_time > time.time():
- try:
- pipeline_status = await
hook.get_adf_pipeline_run_status(
- run_id=self.run_id,
- resource_group_name=self.resource_group_name,
- factory_name=self.factory_name,
- )
- executed_after_token_refresh = True
- if pipeline_status in
AzureDataFactoryPipelineRunStatus.FAILURE_STATES:
- yield TriggerEvent(
- {
- "status": "error",
- "message": f"The pipeline run
{self.run_id} has {pipeline_status}.",
- "run_id": self.run_id,
- }
+ async with hook:
+ pipeline_status = await hook.get_adf_pipeline_run_status(
+ run_id=self.run_id,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
+ )
+ executed_after_token_refresh = True
+ if self.wait_for_termination:
+ while self.end_time > time.time():
+ try:
+ pipeline_status = await
hook.get_adf_pipeline_run_status(
+ run_id=self.run_id,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
)
- return
- elif pipeline_status ==
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
Review Comment:
Same remark here regarding building trigger event
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]