SameerMesiah97 opened a new pull request, #61951:
URL: https://github.com/apache/airflow/pull/61951
**Description**
This change refactors the `DataprocCreateClusterOperator.execute` method to
ensure cluster state reconciliation is consistently applied in non-deferrable
mode.
After submitting the cluster creation request and waiting for the
long-running operation (LRO) to complete, the operator now explicitly fetches
the current cluster state and passes it through a dedicated
`_reconcile_cluster_state` method before returning success.
The reconciliation logic, previously embedded inline in `execute`, has been
consolidated into `_reconcile_cluster_state`. This method handles clusters in
`CREATING`, `DELETING`, and `STOPPED` states by waiting, recreating, or
restarting as appropriate.
**Rationale**
The operator docstring specifies that when `use_if_exists=True`, the
operator should:
* Wait if the cluster is in `CREATING`
* Wait for deletion and then create a new cluster if in `DELETING`
* Handle `ERROR` state appropriately
Although state-handling logic existed, the non-deferrable execution path
previously returned immediately after the create LRO completed, preventing the
existing reconciliation logic from being triggered in certain scenarios (e.g.
cluster transitioning to `DELETING` during creation).
This change ensures the pre-existing reconciliation behavior is executed
consistently, aligning runtime behavior with the documented contract.
**Notes**
* Added explicit `NotFound` handling after the Long-Running Operation (LRO)
completes to surface a clear `AirflowException` if the cluster was deleted
before its state could be reconciled.
* Additional logging has been added and some existing log messages have been
clarified or cleaned up for improved observability during state transitions.
* Comments/variable names have been added or clarified where appropriate.
**Tests**
Unit tests have been added to cover reconciliation scenarios:
* CREATING: verifies the operator waits for creation to complete and
transitions correctly to RUNNING.
* DELETING: verifies the operator waits for deletion to complete and then
re-creates the cluster.
* DELETING (timeout): verifies the operator raises an `AirflowException`
when the cluster remains in `DELETING` state and deletion is not triggered.
* STOPPED: verifies the operator triggers cluster start logic.
* ERROR: verifies error-state handling and deletion behavior when
`delete_on_error=True`.
Existing tests have been updated to align with the new reconciliation flow
and state handling behavior.
**Backwards Compatibility**
There is no intended change to the operator’s public contract. The
implementation now consistently executes the previously defined reconciliation
logic in non-deferrable mode.
Closes: #61947
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]