karthi-keyan-n opened a new issue, #46563:
URL: https://github.com/apache/airflow/issues/46563
### Apache Airflow version
2.10.4
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
While creating workflow in Databricks using `DatabricksWorkflowTaskGroup`, I
have a situation to pass some parameters to spark submit which are passed as
jinja template.
Sample task creation code with DatabricksWorkflowTaskGroup,
```
databricks_workflow = DatabricksWorkflowTaskGroup(
group_id="test_wf",
databricks_conn_id=DATABRICKS_CONNECTION_ID,
add_suffix_on_collision=True,
job_clusters=job_clusters_spec,
)
with databricks_workflow:
task1 = build_task_1(dag)
task2 = build_task_2(dag)
task1 >> task2
```
From the above sample workflow creation code, `build_task_1` and
`build_task_2` returns task which is my custom build operator on extending
`DatabricksTaskBaseOperator`.
Sample code for custom build operator,
```
class CustomDatabricksWorkflowTaskOperator(DatabricksTaskBaseOperator):
"""Custom operator wrapping DatabricksTaskOperator"""
template_fields = (
"application_args",
"spark_conf",
"driver_jvm_props",
)
def __init__(
self,
*args,
class_name: str = "",
spark_conf: dict[str, str] | str = None,
application_args: List[str] | str | None = None,
driver_jvm_props: dict[str, str] | None = None,
application_conf_file: str = "test.conf",
libraries: List[dict[str, str]] = None,
**kwargs,
):
self.class_name = class_name
self.application_conf_file = application_conf_file
self.library = libraries if libraries is not None else
_get_default_libraries()
self.application_args = application_args
super().__init__(*args, **kwargs)
self.driver_jvm_props = driver_jvm_props
self.spark_conf = spark_cone
def _get_task_base_json(self) -> dict[str, Any]:
return {
"run_if": self.databricks_trigger_rule,
'spark_jar_task': {
'main_class_name': self.class_name,
parameters": [
self.application_args,
"{{ get_test_template_value(ti,'test_key') }}" // I
have hard-coded here for ref. In ideal case, this comes as part of
self.application_args which has been added in template_fields
]
},
'libraries': self.library,
}
def execute(self, context):
super().execute(context)
```
From the above code sample, `{{ get_test_template_value(ti,'test_key') }}`
is not getting resolved at the time of `launch` task from
DatabricksWorkflowTaskGroup. But the same value got resolved at the time of
task execution in airflow.
Sample json from `launch` task:
```
"tasks": [
{
"task_key": "task1",
"depends_on": [],
"run_if": "NONE_FAILED",
"spark_jar_task": {
"main_class_name": "task1.jar",
"parameters": [
"app.conf",
"{{ get_test_template_value(ti,'test_key') }}"
]
},
"libraries": [
{
"jar": "test.jar"
}
],
"job_cluster_key": "job_cluster"
},
```
Note: `get_test_template_value` function is added as a macro while DAG
creation.
Though I have included the `application_args` as `template_fields` section
in the CustomDatabricksWorkflowTaskOperator, it is unable to resolve the
template.
### What you think should happen instead?
Expectation is to jinja template resolution should happen while the launch
tasks trigger API requests to Databricks to create the workflow. But since the
template resolution is happening lazily here i.e., at the time of task
execution in airflow which is late in this case as the respective task is
already created in Databricks at the time of `launch` and Databricks doesn't
have to way to get the template resolved value.
### How to reproduce
1. Create a DAG with `DatabricksWorkflowTaskGroup`
2. Add couple of tasks
3. Create a custom Databricks Task operator by extending
`DatabricksTaskBaseOperator`
4. Add one of the constructor parameter in `template_fields` section which
needs to be tested
5. Override `_get_task_base_json` method to return a templated field/value
as of the return dictionary
6. Trigger DAG
7. In launch task, you can notice from logs that the parameter property will
pass the string as-is without resolving the actual value.
8. In the actual task execution, the actual value can be seen in the
rendered template section.
### Operating System
PRETTY_NAME="Debian GNU/Linux 12 (bookworm)", NAME="Debian GNU/Linux"
### Versions of Apache Airflow Providers
NA
### Deployment
Other Docker-based deployment
### Deployment details
We are building our own docker image with base image as
`apache/airflow:2.10.4-python3.10` and use the same for deployment.
### Anything else?
Other approaches which I tried inside `_get_task_base_json` method of the
custom operation which is responsible for returning the task JSON.
1. Tried to force the render by using
`super().render_template(self.application_args, context)` and
`super().render_template_fields(context)` which unfortunately was not possible
as the context was not part of the `_get_task_base_json` method scope.
2. As this is an abstract method and I had to override, unable to pass
context object to the step1.
Due to the above constraint, render templating as well as pulling from xcom
also is not supported. (In other words, all the capabilities which we get from
using context object is not possible).
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]