karenbraganz opened a new issue, #64609:
URL: https://github.com/apache/airflow/issues/64609

   ### Apache Airflow version
   
   Reported on 2.10.5 but could happen on any version.
   
   ### What happened and how to reproduce it?
   
   This is an issue specific to the Databricks provider. 
   
   The retry_args [passed to the 
DatabricksExecutionTrigger](https://github.com/apache/airflow/blob/5497cd16be220a5e5b9ad1ed571de22cca261e95/providers/databricks/src/airflow/providers/databricks/operators/databricks.py#L219)
 is a dictionary that may contain callables in its values. Triggers are 
serialized and stored in the Airflow DB, then deserialized for execution. 
Airflow does not support proper serialization/ deserialization of callables 
passed as arguments to triggers. In this case, the arguments are deserialized 
as strings. If a string object is treated as a function and called, an 
exception will be raised:
   ```
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.10/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 558, in cleanup_finished_triggers
       result = details["task"].result()
     File 
"/usr/local/lib/python3.10/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 630, in run_trigger
       async for event in trigger.run():
     File 
"/usr/local/lib/python3.10/site-packages/airflow/providers/databricks/triggers/databricks.py",
 line 88, in run
       run_state = await self.hook.a_get_run_state(self.run_id)
     File 
"/usr/local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks.py",
 line 450, in a_get_run_state
       response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)
     File 
"/usr/local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 730, in _a_do_api_call
       async for attempt in self._a_get_retry_object():
     File 
"/usr/local/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 
166, in __anext__
       do = await self.iter(retry_state=self._retry_state)
     File 
"/usr/local/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 
153, in iter
       result = await action(retry_state)
     File 
"/usr/local/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 
135, in _run_wait
       sleep = await _utils.wrap_to_async_func(self.wait)(retry_state)
     File "/usr/local/lib/python3.10/site-packages/tenacity/_utils.py", line 
99, in inner
       return call(*args, **kwargs)
   TypeError: 'str' object is not callable
   ```
   
   This results in a task failure if Airflow attempts to retry a call to the 
Databricks API. 
   
   This could be reproduced by passing a custom retry_args dictionary 
containing a function as one of its values to the DatabricksSubmitRunOperator 
in a deferrable task. I observed this issue when these values were passed:
   ```
   {
       "wait": wait_incrementing(start=30, increment=30, max=300),
       "stop": stop_after_attempt(self.databricks_retry_limit),
       "reraise": True
   }
   ```
   You would also have to make sure that the first call to the Databricks API 
fails so that it can be retried in order to reproduce this. In the case that I 
observed, the first try was failing intermittently (possibly due to a 
networking blip) and subsequent retries were failing with a TypeError due to 
the custom retry_args. When the custom retry_args were removed, the API calls 
could retry successfully.
   
   ### What you think should happen instead?
   
   Serialization/ deserialization of callables in triggers is known to be 
unsupported in Airflow. This should not result in unexpected failures for 
deferrable tasks. There are two options for addressing this:
   
   - Pass the callable path to the trigger instead of the callable itself. When 
the Databricks hook retries the API call, it could import the callable from the 
path provided to the trigger and run that. The path can be provided as a 
string, so serializing it should not be an issue.
   - If the above option is not feasible, there should be a check in the 
operator code that raises an exception if custom retry args are passed when 
deferrable=True. This will at least prevent unexpected failures due to API call 
retries.
   
   ### Operating System
   
   _No response_
   
   ### Versions of Apache Airflow Providers
   
   This was reported in version 7.0.0 of the Databricks provider but has not 
been fixed even in the latest version.
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to