karthi-keyan-n opened a new issue, #46563:
URL: https://github.com/apache/airflow/issues/46563

   ### Apache Airflow version
   
   2.10.4
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   While creating workflow in Databricks using `DatabricksWorkflowTaskGroup`, I 
have a situation to pass some parameters to spark submit which are passed as 
jinja template. 
   
   Sample task creation code with DatabricksWorkflowTaskGroup,
   ```
   databricks_workflow = DatabricksWorkflowTaskGroup(
               group_id="test_wf",
               databricks_conn_id=DATABRICKS_CONNECTION_ID,
               add_suffix_on_collision=True,
               job_clusters=job_clusters_spec,
           )
           
   with databricks_workflow:
       task1 = build_task_1(dag)
       task2 = build_task_2(dag)
       task1 >> task2
   ```
   
   From the above sample workflow creation code, `build_task_1` and 
`build_task_2` returns task which is my custom build operator on extending 
`DatabricksTaskBaseOperator`.
   
   Sample code for custom build operator,
   
   ```
   class CustomDatabricksWorkflowTaskOperator(DatabricksTaskBaseOperator):
       """Custom operator wrapping DatabricksTaskOperator"""
   
       template_fields = (
           "application_args",
           "spark_conf",
           "driver_jvm_props",
       )
   
       def __init__(
               self,
               *args,
               class_name: str = "",
               spark_conf: dict[str, str] | str = None,
               application_args: List[str] | str | None = None,
               driver_jvm_props: dict[str, str] | None = None,
               application_conf_file: str = "test.conf",
               libraries: List[dict[str, str]] = None,
               **kwargs,
       ):
           self.class_name = class_name
           self.application_conf_file = application_conf_file
           self.library = libraries if libraries is not None else 
_get_default_libraries()
           self.application_args = application_args
           super().__init__(*args, **kwargs)
   
           self.driver_jvm_props = driver_jvm_props
           self.spark_conf = spark_cone
   
       def _get_task_base_json(self) -> dict[str, Any]:
            return {
               "run_if": self.databricks_trigger_rule,
               'spark_jar_task': {
                   'main_class_name': self.class_name,
                   parameters": [
                        self.application_args,
                        "{{ get_test_template_value(ti,'test_key') }}" // I 
have hard-coded here for ref. In ideal case, this comes as part of 
self.application_args which has been added in template_fields
                  ] 
               },
               'libraries': self.library,
           }
   
       def execute(self, context):
           super().execute(context)
   ```
   
   
   From the above code sample,  `{{ get_test_template_value(ti,'test_key') }}` 
is not getting resolved at the time of `launch` task from 
DatabricksWorkflowTaskGroup. But the same value got resolved at the time of 
task execution in airflow.
   
   Sample json from `launch` task:
   ```
   "tasks": [
       {
         "task_key": "task1",
         "depends_on": [],
         "run_if": "NONE_FAILED",
         "spark_jar_task": {
           "main_class_name": "task1.jar",
           "parameters": [
             "app.conf",
             "{{ get_test_template_value(ti,'test_key') }}"
           ]
         },
         "libraries": [
           {
             "jar": "test.jar"
           }
         ],
         "job_cluster_key": "job_cluster"
       },
   ```
   Note: `get_test_template_value` function is added as a macro while DAG 
creation. 
   
   Though I have included the `application_args` as `template_fields` section 
in the CustomDatabricksWorkflowTaskOperator, it is unable to resolve the 
template.
   
   ### What you think should happen instead?
   
   Expectation is to jinja template resolution should happen while the launch 
tasks trigger API requests to Databricks to create the workflow. But since the 
template resolution is happening lazily here i.e., at the time of task 
execution in airflow which is late in this case as the respective task is 
already created in Databricks at the time of `launch` and Databricks doesn't 
have to way to get the template resolved value.
   
   
   ### How to reproduce
   
   1. Create a DAG with `DatabricksWorkflowTaskGroup`
   2. Add couple of tasks 
   3. Create a custom Databricks Task operator by extending 
`DatabricksTaskBaseOperator`
   4. Add one of the constructor parameter in `template_fields` section which 
needs to be tested
   5. Override `_get_task_base_json` method to return a templated field/value 
as of the return dictionary
   6. Trigger DAG
   7. In launch task, you can notice from logs that the parameter property will 
pass the string as-is without resolving the actual value.
   8. In the actual task execution, the actual value can be seen in the 
rendered template section.
   
   ### Operating System
   
   PRETTY_NAME="Debian GNU/Linux 12 (bookworm)", NAME="Debian GNU/Linux"
   
   ### Versions of Apache Airflow Providers
   
   NA
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   We are building our own docker image with base image as 
`apache/airflow:2.10.4-python3.10` and use the same for deployment.
   
   ### Anything else?
   
   Other approaches which I tried inside `_get_task_base_json` method of the 
custom operation which is responsible for returning the task JSON. 
   
   1. Tried to force the render by using 
`super().render_template(self.application_args, context)` and 
`super().render_template_fields(context)` which unfortunately was not possible 
as the context was not part of the `_get_task_base_json` method scope.
   2. As this is an abstract method and I had to override, unable to pass 
context object to the step1. 
   
   Due to the above constraint, render templating as well as pulling from xcom 
also is not supported. (In other words, all the capabilities which we get from 
using context object is not possible).
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to