This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 66299338eb add a return when the event is yielded in a loop to stop 
the execution (#31985)
66299338eb is described below

commit 66299338eb24aa71eb2e27ebd8b76079b39fd305
Author: Hussein Awala <[email protected]>
AuthorDate: Sun Jun 18 11:14:37 2023 +0200

    add a return when the event is yielded in a loop to stop the execution 
(#31985)
    
    Signed-off-by: Hussein Awala <[email protected]>
---
 airflow/providers/cncf/kubernetes/triggers/pod.py   |  5 +++++
 airflow/providers/databricks/triggers/databricks.py |  1 +
 airflow/providers/google/cloud/triggers/bigquery.py | 21 ++++++++++++++++++++-
 .../providers/google/cloud/triggers/bigquery_dts.py |  8 ++++----
 airflow/providers/google/cloud/triggers/dataflow.py |  4 ++++
 .../providers/google/cloud/triggers/datafusion.py   |  4 +++-
 airflow/providers/google/cloud/triggers/dataproc.py |  5 +++++
 airflow/providers/google/cloud/triggers/gcs.py      |  4 ++++
 .../google/cloud/triggers/kubernetes_engine.py      |  4 +++-
 9 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py 
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index 74a1f8787e..0a7b41b8c0 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -141,6 +141,7 @@ class KubernetesPodTrigger(BaseTrigger):
                             "message": "All containers inside pod have started 
successfully.",
                         }
                     )
+                    return
                 elif self.should_wait(pod_phase=pod_status, 
container_state=container_state):
                     self.log.info("Container is not completed and still 
working.")
 
@@ -159,6 +160,7 @@ class KubernetesPodTrigger(BaseTrigger):
                                     "message": message,
                                 }
                             )
+                            return
 
                     self.log.info("Sleeping for %s seconds.", 
self.poll_interval)
                     await asyncio.sleep(self.poll_interval)
@@ -171,6 +173,7 @@ class KubernetesPodTrigger(BaseTrigger):
                             "message": pod.status.message,
                         }
                     )
+                    return
             except CancelledError:
                 # That means that task was marked as failed
                 if self.get_logs:
@@ -193,6 +196,7 @@ class KubernetesPodTrigger(BaseTrigger):
                         "message": "Pod execution was cancelled",
                     }
                 )
+                return
             except Exception as e:
                 self.log.exception("Exception occurred while checking pod 
phase:")
                 yield TriggerEvent(
@@ -203,6 +207,7 @@ class KubernetesPodTrigger(BaseTrigger):
                         "message": str(e),
                     }
                 )
+                return
 
     def _get_async_hook(self) -> AsyncKubernetesHook:
         if self._hook is None:
diff --git a/airflow/providers/databricks/triggers/databricks.py 
b/airflow/providers/databricks/triggers/databricks.py
index c2400e2c97..e5e56cc0ff 100644
--- a/airflow/providers/databricks/triggers/databricks.py
+++ b/airflow/providers/databricks/triggers/databricks.py
@@ -89,6 +89,7 @@ class DatabricksExecutionTrigger(BaseTrigger):
                             "run_state": run_state.to_json(),
                         }
                     )
+                    return
                 else:
                     self.log.info(
                         "run-id %s in run state %s. sleeping for %s seconds",
diff --git a/airflow/providers/google/cloud/triggers/bigquery.py 
b/airflow/providers/google/cloud/triggers/bigquery.py
index 990fb3e948..bb3973c338 100644
--- a/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/airflow/providers/google/cloud/triggers/bigquery.py
@@ -88,16 +88,19 @@ class BigQueryInsertJobTrigger(BaseTrigger):
                             "message": "Job completed",
                         }
                     )
+                    return
                 elif response_from_hook == "pending":
                     self.log.info("Query is still running...")
                     self.log.info("Sleeping for %s seconds.", 
self.poll_interval)
                     await asyncio.sleep(self.poll_interval)
                 else:
                     yield TriggerEvent({"status": "error", "message": 
response_from_hook})
+                    return
 
             except Exception as e:
                 self.log.exception("Exception occurred while checking for 
query completion")
                 yield TriggerEvent({"status": "error", "message": str(e)})
+                return
 
     def _get_async_hook(self) -> BigQueryAsyncHook:
         return BigQueryAsyncHook(gcp_conn_id=self.conn_id)
@@ -140,6 +143,7 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
                                 "records": None,
                             }
                         )
+                        return
                     else:
                         # Extract only first record from the query results
                         first_record = records.pop(0)
@@ -149,6 +153,7 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
                                 "records": first_record,
                             }
                         )
+                        return
 
                 elif response_from_hook == "pending":
                     self.log.info("Query is still running...")
@@ -156,9 +161,11 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
                     await asyncio.sleep(self.poll_interval)
                 else:
                     yield TriggerEvent({"status": "error", "message": 
response_from_hook})
+                    return
             except Exception as e:
                 self.log.exception("Exception occurred while checking for 
query completion")
                 yield TriggerEvent({"status": "error", "message": str(e)})
+                return
 
 
 class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
@@ -206,15 +213,18 @@ class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
                             "records": records,
                         }
                     )
+                    return
                 elif response_from_hook == "pending":
                     self.log.info("Query is still running...")
                     self.log.info("Sleeping for %s seconds.", 
self.poll_interval)
                     await asyncio.sleep(self.poll_interval)
                 else:
                     yield TriggerEvent({"status": "error", "message": 
response_from_hook})
+                    return
             except Exception as e:
                 self.log.exception("Exception occurred while checking for 
query completion")
                 yield TriggerEvent({"status": "error", "message": str(e)})
+                return
 
 
 class BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
@@ -345,6 +355,7 @@ class 
BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
                             "second_row_data": second_job_row,
                         }
                     )
+                    return
                 elif first_job_response_from_hook == "pending" or 
second_job_response_from_hook == "pending":
                     self.log.info("Query is still running...")
                     self.log.info("Sleeping for %s seconds.", 
self.poll_interval)
@@ -353,10 +364,12 @@ class 
BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
                     yield TriggerEvent(
                         {"status": "error", "message": 
second_job_response_from_hook, "data": None}
                     )
+                    return
 
             except Exception as e:
                 self.log.exception("Exception occurred while checking for 
query completion")
                 yield TriggerEvent({"status": "error", "message": str(e)})
+                return
 
 
 class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
@@ -428,16 +441,18 @@ class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
                     records = records.pop(0) if records else None
                     hook.value_check(self.sql, self.pass_value, records, 
self.tolerance)
                     yield TriggerEvent({"status": "success", "message": "Job 
completed", "records": records})
+                    return
                 elif response_from_hook == "pending":
                     self.log.info("Query is still running...")
                     self.log.info("Sleeping for %s seconds.", 
self.poll_interval)
                     await asyncio.sleep(self.poll_interval)
                 else:
                     yield TriggerEvent({"status": "error", "message": 
response_from_hook, "records": None})
-
+                    return
             except Exception as e:
                 self.log.exception("Exception occurred while checking for 
query completion")
                 yield TriggerEvent({"status": "error", "message": str(e)})
+                return
 
 
 class BigQueryTableExistenceTrigger(BaseTrigger):
@@ -495,10 +510,12 @@ class BigQueryTableExistenceTrigger(BaseTrigger):
                 )
                 if response:
                     yield TriggerEvent({"status": "success", "message": 
"success"})
+                    return
                 await asyncio.sleep(self.poll_interval)
             except Exception as e:
                 self.log.exception("Exception occurred while checking for 
Table existence")
                 yield TriggerEvent({"status": "error", "message": str(e)})
+                return
 
     async def _table_exists(
         self, hook: BigQueryTableAsyncHook, dataset: str, table_id: str, 
project_id: str
@@ -577,9 +594,11 @@ class 
BigQueryTablePartitionExistenceTrigger(BigQueryTableExistenceTrigger):
                                 "message": f"Partition: {self.partition_id} in 
table: {self.table_id}",
                             }
                         )
+                        return
                     job_id = None
                 elif status == "error":
                     yield TriggerEvent({"status": "error", "message": status})
+                    return
                 self.log.info("Sleeping for %s seconds.", self.poll_interval)
                 await asyncio.sleep(self.poll_interval)
 
diff --git a/airflow/providers/google/cloud/triggers/bigquery_dts.py 
b/airflow/providers/google/cloud/triggers/bigquery_dts.py
index 354a689445..95e6aed397 100644
--- a/airflow/providers/google/cloud/triggers/bigquery_dts.py
+++ b/airflow/providers/google/cloud/triggers/bigquery_dts.py
@@ -105,7 +105,7 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
                             "config_id": self.config_id,
                         }
                     )
-
+                    return
                 elif state == TransferState.FAILED:
                     self.log.info("Job has failed")
                     yield TriggerEvent(
@@ -115,7 +115,7 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
                             "message": "Job has failed",
                         }
                     )
-
+                    return
                 if state == TransferState.CANCELLED:
                     self.log.info("Job has been cancelled.")
                     yield TriggerEvent(
@@ -125,12 +125,11 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
                             "message": "Job was cancelled",
                         }
                     )
-
+                    return
                 else:
                     self.log.info("Job is still working...")
                     self.log.info("Waiting for %s seconds", self.poll_interval)
                     await asyncio.sleep(self.poll_interval)
-
             except Exception as e:
                 yield TriggerEvent(
                     {
@@ -138,6 +137,7 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
                         "message": f"Trigger failed with exception: {str(e)}",
                     }
                 )
+                return
 
     def _get_async_hook(self) -> AsyncBiqQueryDataTransferServiceHook:
         return AsyncBiqQueryDataTransferServiceHook(
diff --git a/airflow/providers/google/cloud/triggers/dataflow.py 
b/airflow/providers/google/cloud/triggers/dataflow.py
index bc04ce64b2..5dfdf5106a 100644
--- a/airflow/providers/google/cloud/triggers/dataflow.py
+++ b/airflow/providers/google/cloud/triggers/dataflow.py
@@ -107,6 +107,7 @@ class TemplateJobStartTrigger(BaseTrigger):
                             "message": "Job completed",
                         }
                     )
+                    return
                 elif status == JobState.JOB_STATE_FAILED:
                     yield TriggerEvent(
                         {
@@ -114,6 +115,7 @@ class TemplateJobStartTrigger(BaseTrigger):
                             "message": f"Dataflow job with id {self.job_id} 
has failed its execution",
                         }
                     )
+                    return
                 elif status == JobState.JOB_STATE_STOPPED:
                     yield TriggerEvent(
                         {
@@ -121,6 +123,7 @@ class TemplateJobStartTrigger(BaseTrigger):
                             "message": f"Dataflow job with id {self.job_id} 
was stopped",
                         }
                     )
+                    return
                 else:
                     self.log.info("Job is still running...")
                     self.log.info("Current job status is: %s", status)
@@ -129,6 +132,7 @@ class TemplateJobStartTrigger(BaseTrigger):
             except Exception as e:
                 self.log.exception("Exception occurred while checking for job 
completion.")
                 yield TriggerEvent({"status": "error", "message": str(e)})
+                return
 
     def _get_async_hook(self) -> AsyncDataflowHook:
         return AsyncDataflowHook(
diff --git a/airflow/providers/google/cloud/triggers/datafusion.py 
b/airflow/providers/google/cloud/triggers/datafusion.py
index 8e2007d8cb..66ed139f34 100644
--- a/airflow/providers/google/cloud/triggers/datafusion.py
+++ b/airflow/providers/google/cloud/triggers/datafusion.py
@@ -101,16 +101,18 @@ class DataFusionStartPipelineTrigger(BaseTrigger):
                             "message": "Pipeline is running",
                         }
                     )
+                    return
                 elif response_from_hook == "pending":
                     self.log.info("Pipeline is not still in running state...")
                     self.log.info("Sleeping for %s seconds.", 
self.poll_interval)
                     await asyncio.sleep(self.poll_interval)
                 else:
                     yield TriggerEvent({"status": "error", "message": 
response_from_hook})
-
+                    return
             except Exception as e:
                 self.log.exception("Exception occurred while checking for 
pipeline state")
                 yield TriggerEvent({"status": "error", "message": str(e)})
+                return
 
     def _get_async_hook(self) -> DataFusionAsyncHook:
         return DataFusionAsyncHook(
diff --git a/airflow/providers/google/cloud/triggers/dataproc.py 
b/airflow/providers/google/cloud/triggers/dataproc.py
index 852d5d9b0d..6f2be85124 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -279,8 +279,10 @@ class DataprocDeleteClusterTrigger(DataprocBaseTrigger):
                 await asyncio.sleep(self.polling_interval_seconds)
             except NotFound:
                 yield TriggerEvent({"status": "success", "message": ""})
+                return
             except Exception as e:
                 yield TriggerEvent({"status": "error", "message": str(e)})
+                return
         yield TriggerEvent({"status": "error", "message": "Timeout"})
 
 
@@ -322,6 +324,7 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
                                 "message": operation.error.message,
                             }
                         )
+                        return
                     yield TriggerEvent(
                         {
                             "operation_name": operation.name,
@@ -330,6 +333,7 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
                             "message": "Operation is successfully ended.",
                         }
                     )
+                    return
                 else:
                     self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
                     await asyncio.sleep(self.polling_interval_seconds)
@@ -341,3 +345,4 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
                         "message": str(e),
                     }
                 )
+                return
diff --git a/airflow/providers/google/cloud/triggers/gcs.py 
b/airflow/providers/google/cloud/triggers/gcs.py
index 18424c7be9..9595cc6ca8 100644
--- a/airflow/providers/google/cloud/triggers/gcs.py
+++ b/airflow/providers/google/cloud/triggers/gcs.py
@@ -79,6 +79,7 @@ class GCSBlobTrigger(BaseTrigger):
                 )
                 if res == "success":
                     yield TriggerEvent({"status": "success", "message": res})
+                    return
                 await asyncio.sleep(self.poke_interval)
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e)})
@@ -159,6 +160,7 @@ class GCSCheckBlobUpdateTimeTrigger(BaseTrigger):
                 )
                 if status:
                     yield TriggerEvent(res)
+                    return
                 await asyncio.sleep(self.poke_interval)
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e)})
@@ -262,6 +264,7 @@ class GCSPrefixBlobTrigger(GCSBlobTrigger):
                     yield TriggerEvent(
                         {"status": "success", "message": "Successfully 
completed", "matches": res}
                     )
+                    return
                 await asyncio.sleep(self.poke_interval)
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e)})
@@ -364,6 +367,7 @@ class GCSUploadSessionTrigger(GCSPrefixBlobTrigger):
                 res = self._is_bucket_updated(set(list_blobs))
                 if res["status"] in ("success", "error"):
                     yield TriggerEvent(res)
+                    return
                 await asyncio.sleep(self.poke_interval)
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e)})
diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py 
b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
index ff19350cfe..1ec0e420f9 100644
--- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
@@ -176,7 +176,7 @@ class GKEOperationTrigger(BaseTrigger):
                             "operation_name": operation.name,
                         }
                     )
-
+                    return
                 elif status == Operation.Status.RUNNING or status == 
Operation.Status.PENDING:
                     self.log.info("Operation is still running.")
                     self.log.info("Sleeping for %ss...", self.poll_interval)
@@ -189,6 +189,7 @@ class GKEOperationTrigger(BaseTrigger):
                             "message": f"Operation has failed with status: 
{operation.status}",
                         }
                     )
+                    return
             except Exception as e:
                 self.log.exception("Exception occurred while checking 
operation status")
                 yield TriggerEvent(
@@ -197,6 +198,7 @@ class GKEOperationTrigger(BaseTrigger):
                         "message": str(e),
                     }
                 )
+                return
 
     def _get_hook(self) -> GKEAsyncHook:
         if self._hook is None:

Reply via email to