hardeybisey commented on code in PR #44960:
URL: https://github.com/apache/airflow/pull/44960#discussion_r1890298916


##########
providers/src/airflow/providers/databricks/operators/databricks.py:
##########
@@ -1127,19 +1134,17 @@ def _get_current_databricks_task(self) -> dict[str, 
Any]:
         # building the {task_key: task} map below.
         sorted_task_runs = sorted(tasks, key=lambda x: x["start_time"])
 
-        return {task["task_key"]: task for task in sorted_task_runs}[
-            self._get_databricks_task_id(self.task_id)
-        ]
+        return {task["task_key"]: task for task in 
sorted_task_runs}[self.databricks_task_key]
 
     def _convert_to_databricks_workflow_task(
         self, relevant_upstreams: list[BaseOperator], context: Context | None 
= None
     ) -> dict[str, object]:
         """Convert the operator to a Databricks workflow task that can be a 
task in a workflow."""
         base_task_json = self._get_task_base_json()
         result = {
-            "task_key": self._get_databricks_task_id(self.task_id),
+            "task_key": self.databricks_task_key,
             "depends_on": [
-                {"task_key": self._get_databricks_task_id(task_id)}
+                {"task_key": self._generate_databricks_task_key(task_id)}
                 for task_id in self.upstream_task_ids
                 if task_id in relevant_upstreams
             ],

Review Comment:
   @rawwar The reason `_generate_databricks_task_key` is implemented as a 
separate function is due to how the `task_key` is generated on line 1147. At 
that point, we only have access to the `task_id` and not the Operator object. 
Therefore, `_generate_databricks_task_key` is called with the task_id as its 
input.
   
   I also noticed a potential issue: `relevant_upstreams` contains as a list of 
subclass of BaseOperators according to this function signature. However, we are 
checking for the presence of `task_id`, which is a string, within this list. 
This seems to be an unintended bug.



##########
providers/src/airflow/providers/databricks/operators/databricks.py:
##########
@@ -1037,17 +1040,21 @@ def _get_hook(self, caller: str) -> DatabricksHook:
             caller=caller,
         )
 
-    def _get_databricks_task_id(self, task_id: str) -> str:
-        """Get the databricks task ID using dag_id and task_id. Removes 
illegal characters."""
-        task_id = f"{self.dag_id}__{task_id.replace('.', '__')}"
-        if len(task_id) > 100:
-            self.log.warning(
-                "The generated task_key '%s' exceeds 100 characters and will 
be truncated by the Databricks API. "
-                "This will cause failure when trying to monitor the task. 
task_key is generated by ",
-                "concatenating dag_id and task_id.",
-                task_id,
+    @cached_property
+    def databricks_task_key(self) -> str:
+        return self._generate_databricks_task_key()
+
+    def _generate_databricks_task_key(self, task_id: str | None = None) -> str:
+        """Create a databricks task key using the hash of dag_id and 
task_id."""
+        if not self._databricks_task_key or len(self._databricks_task_key) > 
100:
+            self.log.info(
+                "databricks_task_key has not be provided or the provided one 
exceeds 100 characters and will be truncated by the Databricks API. This will 
cause failure when trying to monitor the task. A task_key will be generated 
using the hash value of dag_id+task_id"
             )
-        return task_id
+            task_id = task_id or self.task_id
+            task_key = f"{self.dag_id}__{task_id}".encode()
+            self._databricks_task_key = hashlib.md5(task_key).hexdigest()
+            self.log.info("Generated databricks task_key: %s", 
self._databricks_task_key)
+        return self._databricks_task_key

Review Comment:
   I left a comment with my reason for keeping the function seperately on Line 
1150



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to