ephraimbuddy commented on code in PR #56476:
URL: https://github.com/apache/airflow/pull/56476#discussion_r2417291249
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -316,23 +324,29 @@ class SerializedDagModel(Base):
load_op_links = True
- def __init__(self, dag: LazyDeserializedDAG) -> None:
- self.dag_id = dag.dag_id
- dag_data = dag.data
- self.dag_hash = SerializedDagModel.hash(dag_data)
-
- # partially ordered json data
- dag_data_json = json.dumps(dag_data, sort_keys=True).encode("utf-8")
-
- if COMPRESS_SERIALIZED_DAGS:
+ def __init__(
+ self,
+ *,
+ dag_id: str,
+ dag_data: dict,
+ dag_hash: str,
Review Comment:
Why these args?
##########
airflow-core/src/airflow/config_templates/default_airflow.cfg:
##########
@@ -43,3 +43,5 @@
#
# airflow config list --defaults > "${AIRFLOW_HOME}/airflow.cfg"
#
+[dag_processor]
+dag_bundle_config_list = [{"name": "test-bundle", "classpath":
"airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {}}]
Review Comment:
Why?
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -347,30 +361,52 @@ def hash(cls, dag_data):
# does not affect the hash. In 3.0+, a combination of
# bundle_path and relative fileloc more correctly determines the
# dag file location.
+ # Remove file location from hash calculation so that DAG hash
+ # depends only on DAG structure, not file location.
data_["dag"].pop("fileloc", None)
- data_json = json.dumps(data_, sort_keys=True).encode("utf-8")
+ # hash method
+ data_json = json.dumps(data_, sort_keys=True,
default=_json_default).encode("utf-8")
return md5(data_json).hexdigest()
@classmethod
def _sort_serialized_dag_dict(cls, serialized_dag: Any):
- """Recursively sort json_dict and its nested dictionaries and lists."""
+ """
+ Sorts dicts and lists recursively for deterministic hashing.
+ Examples:
+ - dict keys sorted alphabetically
+ - list of dicts with __var.task_id sorted by task_id
+ - list of strings sorted alphabetically
+ """
+
if isinstance(serialized_dag, dict):
return {k: cls._sort_serialized_dag_dict(v) for k, v in
sorted(serialized_dag.items())}
+
if isinstance(serialized_dag, list):
+ # Case 1: list of dicts with __var.task_id
if all(isinstance(i, dict) for i in serialized_dag):
- if all(
- isinstance(i.get("__var", {}), Iterable) and "task_id" in
i.get("__var", {})
- for i in serialized_dag
- ):
+ if all(isinstance(i.get("__var", {}), dict) and "task_id" in
i.get("__var", {}) for i in serialized_dag):
return sorted(
[cls._sort_serialized_dag_dict(i) for i in
serialized_dag],
- key=lambda x: x["__var"]["task_id"],
+ key=lambda x: x.get("__var", {}).get("task_id", "")
)
+ # Case 2: list of strings
elif all(isinstance(item, str) for item in serialized_dag):
return sorted(serialized_dag)
+ # Case 3: mixed or other dicts
return [cls._sort_serialized_dag_dict(i) for i in serialized_dag]
+
+ # Base case: primitive type
return serialized_dag
+
+ @classmethod
+ def serialize_dag(cls, dag: LazyDeserializedDAG, compress: bool = False)
-> str:
Review Comment:
Where is this used?
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -409,56 +444,86 @@ def write_dag(
):
return False
- log.debug("Checking if DAG (%s) changed", dag.dag_id)
- new_serialized_dag = cls(dag)
- serialized_dag_hash = session.scalars(
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug("Checking if DAG (%s) changed", dag.dag_id)
Review Comment:
Why this?
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -347,30 +361,52 @@ def hash(cls, dag_data):
# does not affect the hash. In 3.0+, a combination of
# bundle_path and relative fileloc more correctly determines the
# dag file location.
+ # Remove file location from hash calculation so that DAG hash
+ # depends only on DAG structure, not file location.
data_["dag"].pop("fileloc", None)
- data_json = json.dumps(data_, sort_keys=True).encode("utf-8")
+ # hash method
+ data_json = json.dumps(data_, sort_keys=True,
default=_json_default).encode("utf-8")
return md5(data_json).hexdigest()
@classmethod
def _sort_serialized_dag_dict(cls, serialized_dag: Any):
- """Recursively sort json_dict and its nested dictionaries and lists."""
+ """
+ Sorts dicts and lists recursively for deterministic hashing.
+ Examples:
+ - dict keys sorted alphabetically
+ - list of dicts with __var.task_id sorted by task_id
+ - list of strings sorted alphabetically
+ """
Review Comment:
Honestly, this method is the only place you have to work on. The other
changes are not necessary
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]