This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 66299338eb add a return when the event is yielded in a loop to stop
the execution (#31985)
66299338eb is described below
commit 66299338eb24aa71eb2e27ebd8b76079b39fd305
Author: Hussein Awala <[email protected]>
AuthorDate: Sun Jun 18 11:14:37 2023 +0200
add a return when the event is yielded in a loop to stop the execution
(#31985)
Signed-off-by: Hussein Awala <[email protected]>
---
airflow/providers/cncf/kubernetes/triggers/pod.py | 5 +++++
airflow/providers/databricks/triggers/databricks.py | 1 +
airflow/providers/google/cloud/triggers/bigquery.py | 21 ++++++++++++++++++++-
.../providers/google/cloud/triggers/bigquery_dts.py | 8 ++++----
airflow/providers/google/cloud/triggers/dataflow.py | 4 ++++
.../providers/google/cloud/triggers/datafusion.py | 4 +++-
airflow/providers/google/cloud/triggers/dataproc.py | 5 +++++
airflow/providers/google/cloud/triggers/gcs.py | 4 ++++
.../google/cloud/triggers/kubernetes_engine.py | 4 +++-
9 files changed, 49 insertions(+), 7 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index 74a1f8787e..0a7b41b8c0 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -141,6 +141,7 @@ class KubernetesPodTrigger(BaseTrigger):
"message": "All containers inside pod have started
successfully.",
}
)
+ return
elif self.should_wait(pod_phase=pod_status,
container_state=container_state):
self.log.info("Container is not completed and still
working.")
@@ -159,6 +160,7 @@ class KubernetesPodTrigger(BaseTrigger):
"message": message,
}
)
+ return
self.log.info("Sleeping for %s seconds.",
self.poll_interval)
await asyncio.sleep(self.poll_interval)
@@ -171,6 +173,7 @@ class KubernetesPodTrigger(BaseTrigger):
"message": pod.status.message,
}
)
+ return
except CancelledError:
# That means that task was marked as failed
if self.get_logs:
@@ -193,6 +196,7 @@ class KubernetesPodTrigger(BaseTrigger):
"message": "Pod execution was cancelled",
}
)
+ return
except Exception as e:
self.log.exception("Exception occurred while checking pod
phase:")
yield TriggerEvent(
@@ -203,6 +207,7 @@ class KubernetesPodTrigger(BaseTrigger):
"message": str(e),
}
)
+ return
def _get_async_hook(self) -> AsyncKubernetesHook:
if self._hook is None:
diff --git a/airflow/providers/databricks/triggers/databricks.py
b/airflow/providers/databricks/triggers/databricks.py
index c2400e2c97..e5e56cc0ff 100644
--- a/airflow/providers/databricks/triggers/databricks.py
+++ b/airflow/providers/databricks/triggers/databricks.py
@@ -89,6 +89,7 @@ class DatabricksExecutionTrigger(BaseTrigger):
"run_state": run_state.to_json(),
}
)
+ return
else:
self.log.info(
"run-id %s in run state %s. sleeping for %s seconds",
diff --git a/airflow/providers/google/cloud/triggers/bigquery.py
b/airflow/providers/google/cloud/triggers/bigquery.py
index 990fb3e948..bb3973c338 100644
--- a/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/airflow/providers/google/cloud/triggers/bigquery.py
@@ -88,16 +88,19 @@ class BigQueryInsertJobTrigger(BaseTrigger):
"message": "Job completed",
}
)
+ return
elif response_from_hook == "pending":
self.log.info("Query is still running...")
self.log.info("Sleeping for %s seconds.",
self.poll_interval)
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent({"status": "error", "message":
response_from_hook})
+ return
except Exception as e:
self.log.exception("Exception occurred while checking for
query completion")
yield TriggerEvent({"status": "error", "message": str(e)})
+ return
def _get_async_hook(self) -> BigQueryAsyncHook:
return BigQueryAsyncHook(gcp_conn_id=self.conn_id)
@@ -140,6 +143,7 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
"records": None,
}
)
+ return
else:
# Extract only first record from the query results
first_record = records.pop(0)
@@ -149,6 +153,7 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
"records": first_record,
}
)
+ return
elif response_from_hook == "pending":
self.log.info("Query is still running...")
@@ -156,9 +161,11 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent({"status": "error", "message":
response_from_hook})
+ return
except Exception as e:
self.log.exception("Exception occurred while checking for
query completion")
yield TriggerEvent({"status": "error", "message": str(e)})
+ return
class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
@@ -206,15 +213,18 @@ class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
"records": records,
}
)
+ return
elif response_from_hook == "pending":
self.log.info("Query is still running...")
self.log.info("Sleeping for %s seconds.",
self.poll_interval)
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent({"status": "error", "message":
response_from_hook})
+ return
except Exception as e:
self.log.exception("Exception occurred while checking for
query completion")
yield TriggerEvent({"status": "error", "message": str(e)})
+ return
class BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
@@ -345,6 +355,7 @@ class
BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
"second_row_data": second_job_row,
}
)
+ return
elif first_job_response_from_hook == "pending" or
second_job_response_from_hook == "pending":
self.log.info("Query is still running...")
self.log.info("Sleeping for %s seconds.",
self.poll_interval)
@@ -353,10 +364,12 @@ class
BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
yield TriggerEvent(
{"status": "error", "message":
second_job_response_from_hook, "data": None}
)
+ return
except Exception as e:
self.log.exception("Exception occurred while checking for
query completion")
yield TriggerEvent({"status": "error", "message": str(e)})
+ return
class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
@@ -428,16 +441,18 @@ class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
records = records.pop(0) if records else None
hook.value_check(self.sql, self.pass_value, records,
self.tolerance)
yield TriggerEvent({"status": "success", "message": "Job
completed", "records": records})
+ return
elif response_from_hook == "pending":
self.log.info("Query is still running...")
self.log.info("Sleeping for %s seconds.",
self.poll_interval)
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent({"status": "error", "message":
response_from_hook, "records": None})
-
+ return
except Exception as e:
self.log.exception("Exception occurred while checking for
query completion")
yield TriggerEvent({"status": "error", "message": str(e)})
+ return
class BigQueryTableExistenceTrigger(BaseTrigger):
@@ -495,10 +510,12 @@ class BigQueryTableExistenceTrigger(BaseTrigger):
)
if response:
yield TriggerEvent({"status": "success", "message":
"success"})
+ return
await asyncio.sleep(self.poll_interval)
except Exception as e:
self.log.exception("Exception occurred while checking for
Table existence")
yield TriggerEvent({"status": "error", "message": str(e)})
+ return
async def _table_exists(
self, hook: BigQueryTableAsyncHook, dataset: str, table_id: str,
project_id: str
@@ -577,9 +594,11 @@ class
BigQueryTablePartitionExistenceTrigger(BigQueryTableExistenceTrigger):
"message": f"Partition: {self.partition_id} in
table: {self.table_id}",
}
)
+ return
job_id = None
elif status == "error":
yield TriggerEvent({"status": "error", "message": status})
+ return
self.log.info("Sleeping for %s seconds.", self.poll_interval)
await asyncio.sleep(self.poll_interval)
diff --git a/airflow/providers/google/cloud/triggers/bigquery_dts.py
b/airflow/providers/google/cloud/triggers/bigquery_dts.py
index 354a689445..95e6aed397 100644
--- a/airflow/providers/google/cloud/triggers/bigquery_dts.py
+++ b/airflow/providers/google/cloud/triggers/bigquery_dts.py
@@ -105,7 +105,7 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
"config_id": self.config_id,
}
)
-
+ return
elif state == TransferState.FAILED:
self.log.info("Job has failed")
yield TriggerEvent(
@@ -115,7 +115,7 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
"message": "Job has failed",
}
)
-
+ return
if state == TransferState.CANCELLED:
self.log.info("Job has been cancelled.")
yield TriggerEvent(
@@ -125,12 +125,11 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
"message": "Job was cancelled",
}
)
-
+ return
else:
self.log.info("Job is still working...")
self.log.info("Waiting for %s seconds", self.poll_interval)
await asyncio.sleep(self.poll_interval)
-
except Exception as e:
yield TriggerEvent(
{
@@ -138,6 +137,7 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
"message": f"Trigger failed with exception: {str(e)}",
}
)
+ return
def _get_async_hook(self) -> AsyncBiqQueryDataTransferServiceHook:
return AsyncBiqQueryDataTransferServiceHook(
diff --git a/airflow/providers/google/cloud/triggers/dataflow.py
b/airflow/providers/google/cloud/triggers/dataflow.py
index bc04ce64b2..5dfdf5106a 100644
--- a/airflow/providers/google/cloud/triggers/dataflow.py
+++ b/airflow/providers/google/cloud/triggers/dataflow.py
@@ -107,6 +107,7 @@ class TemplateJobStartTrigger(BaseTrigger):
"message": "Job completed",
}
)
+ return
elif status == JobState.JOB_STATE_FAILED:
yield TriggerEvent(
{
@@ -114,6 +115,7 @@ class TemplateJobStartTrigger(BaseTrigger):
"message": f"Dataflow job with id {self.job_id}
has failed its execution",
}
)
+ return
elif status == JobState.JOB_STATE_STOPPED:
yield TriggerEvent(
{
@@ -121,6 +123,7 @@ class TemplateJobStartTrigger(BaseTrigger):
"message": f"Dataflow job with id {self.job_id}
was stopped",
}
)
+ return
else:
self.log.info("Job is still running...")
self.log.info("Current job status is: %s", status)
@@ -129,6 +132,7 @@ class TemplateJobStartTrigger(BaseTrigger):
except Exception as e:
self.log.exception("Exception occurred while checking for job
completion.")
yield TriggerEvent({"status": "error", "message": str(e)})
+ return
def _get_async_hook(self) -> AsyncDataflowHook:
return AsyncDataflowHook(
diff --git a/airflow/providers/google/cloud/triggers/datafusion.py
b/airflow/providers/google/cloud/triggers/datafusion.py
index 8e2007d8cb..66ed139f34 100644
--- a/airflow/providers/google/cloud/triggers/datafusion.py
+++ b/airflow/providers/google/cloud/triggers/datafusion.py
@@ -101,16 +101,18 @@ class DataFusionStartPipelineTrigger(BaseTrigger):
"message": "Pipeline is running",
}
)
+ return
elif response_from_hook == "pending":
self.log.info("Pipeline is not still in running state...")
self.log.info("Sleeping for %s seconds.",
self.poll_interval)
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent({"status": "error", "message":
response_from_hook})
-
+ return
except Exception as e:
self.log.exception("Exception occurred while checking for
pipeline state")
yield TriggerEvent({"status": "error", "message": str(e)})
+ return
def _get_async_hook(self) -> DataFusionAsyncHook:
return DataFusionAsyncHook(
diff --git a/airflow/providers/google/cloud/triggers/dataproc.py
b/airflow/providers/google/cloud/triggers/dataproc.py
index 852d5d9b0d..6f2be85124 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -279,8 +279,10 @@ class DataprocDeleteClusterTrigger(DataprocBaseTrigger):
await asyncio.sleep(self.polling_interval_seconds)
except NotFound:
yield TriggerEvent({"status": "success", "message": ""})
+ return
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
+ return
yield TriggerEvent({"status": "error", "message": "Timeout"})
@@ -322,6 +324,7 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
"message": operation.error.message,
}
)
+ return
yield TriggerEvent(
{
"operation_name": operation.name,
@@ -330,6 +333,7 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
"message": "Operation is successfully ended.",
}
)
+ return
else:
self.log.info("Sleeping for %s seconds.",
self.polling_interval_seconds)
await asyncio.sleep(self.polling_interval_seconds)
@@ -341,3 +345,4 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
"message": str(e),
}
)
+ return
diff --git a/airflow/providers/google/cloud/triggers/gcs.py
b/airflow/providers/google/cloud/triggers/gcs.py
index 18424c7be9..9595cc6ca8 100644
--- a/airflow/providers/google/cloud/triggers/gcs.py
+++ b/airflow/providers/google/cloud/triggers/gcs.py
@@ -79,6 +79,7 @@ class GCSBlobTrigger(BaseTrigger):
)
if res == "success":
yield TriggerEvent({"status": "success", "message": res})
+ return
await asyncio.sleep(self.poke_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
@@ -159,6 +160,7 @@ class GCSCheckBlobUpdateTimeTrigger(BaseTrigger):
)
if status:
yield TriggerEvent(res)
+ return
await asyncio.sleep(self.poke_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
@@ -262,6 +264,7 @@ class GCSPrefixBlobTrigger(GCSBlobTrigger):
yield TriggerEvent(
{"status": "success", "message": "Successfully
completed", "matches": res}
)
+ return
await asyncio.sleep(self.poke_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
@@ -364,6 +367,7 @@ class GCSUploadSessionTrigger(GCSPrefixBlobTrigger):
res = self._is_bucket_updated(set(list_blobs))
if res["status"] in ("success", "error"):
yield TriggerEvent(res)
+ return
await asyncio.sleep(self.poke_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
index ff19350cfe..1ec0e420f9 100644
--- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
@@ -176,7 +176,7 @@ class GKEOperationTrigger(BaseTrigger):
"operation_name": operation.name,
}
)
-
+ return
elif status == Operation.Status.RUNNING or status ==
Operation.Status.PENDING:
self.log.info("Operation is still running.")
self.log.info("Sleeping for %ss...", self.poll_interval)
@@ -189,6 +189,7 @@ class GKEOperationTrigger(BaseTrigger):
"message": f"Operation has failed with status:
{operation.status}",
}
)
+ return
except Exception as e:
self.log.exception("Exception occurred while checking
operation status")
yield TriggerEvent(
@@ -197,6 +198,7 @@ class GKEOperationTrigger(BaseTrigger):
"message": str(e),
}
)
+ return
def _get_hook(self) -> GKEAsyncHook:
if self._hook is None: