SameerMesiah97 opened a new issue, #61947:
URL: https://github.com/apache/airflow/issues/61947

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   `apache-airflow-providers-google==20.0.0rc1`
   
   ### Apache Airflow version
   
   main
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   `DataprocCreateClusterOperator` in non-deferrable mode does not correctly 
handle the case where a cluster transitions to `DELETING` during the creation 
lifecycle.
   
   When the operator submits a create request and waits for the long-running 
operation (LRO) to complete, it assumes the cluster has successfully reached a 
stable state once the LRO finishes. However, if the cluster is manually deleted 
while it is still in `CREATING`, the following can occur:
   
   * The LRO completes successfully.
   * The operator proceeds without reconciling the actual cluster state.
   * The task may succeed or fail without properly handling the `DELETING` 
transition.
   
   As a result, the operator does not always converge the cluster to a stable 
`RUNNING` state before returning.
   
   ### What you think should happen instead
   
   In non-deferrable mode, after the LRO completes, the operator should always 
fetch the current cluster state and reconcile it according to the documented 
behavior.
   
   If the cluster is in `DELETING`, the operator should wait for deletion to 
complete and then attempt to create the cluster again.
   
   If the cluster is in `CREATING`, it should wait until the cluster reaches a 
terminal state.
   
   If the cluster is in `ERROR`, it should handle deletion according to 
`delete_on_error`.
   
   In other words, the operator should consistently converge the cluster to a 
stable `RUNNING` state before returning success.
   
   ### How to reproduce
   
   1. Configure a Google Cloud connection in Airflow (for example 
`google_cloud_default`) with a service account that has permission to create 
and delete Dataproc clusters.
   
   2. Create the following minimal DAG (non-deferrable mode).
      **Replace `<YOUR_PROJECT_ID>` with your actual GCP project ID before 
running.**
   
   ```python
   from datetime import datetime
   from airflow import DAG
   from airflow.providers.google.cloud.operators.dataproc import (
       DataprocCreateClusterOperator,
   )
   
   PROJECT_ID = "<YOUR_PROJECT_ID>"  # <-- replace this
   REGION = "us-central1"
   CLUSTER_NAME = "airflow-delete-during-create-repro"
   
   CLUSTER_CONFIG = {
       "master_config": {
           "num_instances": 1,
           "machine_type_uri": "n2-standard-2",
           "disk_config": {
               "boot_disk_size_gb": 30
           },
       },
       "worker_config": {
           "num_instances": 2,
           "machine_type_uri": "n2-standard-2",
           "disk_config": {
               "boot_disk_size_gb": 30
           },
       },
   }
   
   with DAG(
       dag_id="dataproc_delete_during_create_repro",
       start_date=datetime(2025, 1, 1),
       schedule=None,
       catchup=False,
   ) as dag:
   
       create_cluster = DataprocCreateClusterOperator(
           task_id="create_cluster",
           project_id=PROJECT_ID,
           region=REGION,
           cluster_name=CLUSTER_NAME,
           cluster_config=CLUSTER_CONFIG,
           deferrable=False,  # important for repro
       )
   ```
   
   3. Trigger the DAG.
   
   4. While the cluster is in `CREATING` state (visible in the GCP Console 
under Dataproc → Clusters), manually delete the cluster.
   
   **Observed Behavior**
   
   The task does not consistently reconcile the intermediate `DELETING` state. 
Instead of waiting for deletion to complete and creating a new cluster (as 
described in the operator docstring), the task succeeds without fully 
reconciling the cluster back to a `RUNNING` state.
   
   ### Anything else
   
   The operator docstring already states that if a cluster is in `DELETING`, it 
should wait for deletion and then create a new cluster. The existing code also 
contains logic intended to reconcile intermediate states (`CREATING`, 
`DELETING`, `STOPPED`) until the cluster reaches a stable `RUNNING` state.
   
   However, in non-deferrable mode, execution could return earlier in the LRO 
path, preventing that reconciliation logic from running.
   
   **The proposed change does not introduce novel behavior**. It ensures that 
reconciliation always runs after the LRO completes, bringing runtime behavior 
into alignment with both **the existing reconciliation logic and the documented 
contract**.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to