[ https://issues.apache.org/jira/browse/AIRFLOW-6795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nathaniel Ritholtz updated AIRFLOW-6795: ---------------------------------------- Summary: serialized_dag table's data column text type is too small for mysql (was: serialized_dag table's data type max length is too small) > serialized_dag table's data column text type is too small for mysql > ------------------------------------------------------------------- > > Key: AIRFLOW-6795 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6795 > Project: Apache Airflow > Issue Type: Bug > Components: serialization > Affects Versions: 1.10.9 > Reporter: Nathaniel Ritholtz > Priority: Major > > When upgrading to v1.10.9, I tried using the new store_serialized_dags flag. > However, the scheduler was erroring out with: > {code} > scheduler_1 | Process DagFileProcessor2163-Process: > scheduler_1 | Traceback (most recent call last): > scheduler_1 | File "/usr/local/lib/python3.6/multiprocessing/process.py", > line 258, in _bootstrap > scheduler_1 | self.run() > scheduler_1 | File "/usr/local/lib/python3.6/multiprocessing/process.py", > line 93, in run > scheduler_1 | self._target(*self._args, **self._kwargs) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line > 157, in _run_file_processor > scheduler_1 | pickle_dags) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in > wrapper > scheduler_1 | return func(*args, **kwargs) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line > 1580, in process_file > scheduler_1 | dag.sync_to_db() > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in > wrapper > scheduler_1 | return func(*args, **kwargs) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1514, in > sync_to_db > scheduler_1 | session=session > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in > wrapper > scheduler_1 | return func(*args, **kwargs) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/airflow/models/serialized_dag.py", > line 118, in write_dag > scheduler_1 | session.merge(cls(dag)) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line > 2113, in merge > scheduler_1 | _resolve_conflict_map=_resolve_conflict_map, > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line > 2186, in _merge > scheduler_1 | merged = self.query(mapper.class_).get(key[1]) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 1004, > in get > scheduler_1 | return self._get_impl(ident, loading.load_on_pk_identity) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 1116, > in _get_impl > scheduler_1 | return db_load_fn(self, primary_key_identity) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 284, > in load_on_pk_identity > scheduler_1 | return q.one() > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3347, > in one > scheduler_1 | ret = self.one_or_none() > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3316, > in one_or_none > scheduler_1 | ret = list(self) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 101, > in instances > scheduler_1 | util.raise_from_cause(err) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, > in raise_from_cause > scheduler_1 | reraise(type(exception), exception, tb=exc_tb, cause=cause) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, > in reraise > scheduler_1 | raise value > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81, > in instances > scheduler_1 | rows = [proc(row) for row in fetch] > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81, > in <listcomp> > scheduler_1 | rows = [proc(row) for row in fetch] > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 574, > in _instance > scheduler_1 | populators, > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 695, > in _populate_full > scheduler_1 | dict_[key] = getter(row) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/type_api.py", line > 1266, in process > scheduler_1 | return process_value(impl_processor(value), dialect) > scheduler_1 | File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/sqltypes.py", line > 2407, in process > scheduler_1 | return json_deserializer(value) > scheduler_1 | File "/usr/local/lib/python3.6/json/__init__.py", line 354, > in loads > scheduler_1 | return _default_decoder.decode(s) > scheduler_1 | File "/usr/local/lib/python3.6/json/decoder.py", line 339, > in decode > scheduler_1 | obj, end = self.raw_decode(s, idx=_w(s, 0).end()) > scheduler_1 | File "/usr/local/lib/python3.6/json/decoder.py", line 355, > in raw_decode > scheduler_1 | obj, end = self.scan_once(s, idx) > {code} > After further investigation, I found it was because of my usage of the > airflow-db-dag > (https://github.com/teamclairvoyant/airflow-maintenance-dags/blob/master/db-cleanup/airflow-db-cleanup.py). > In this DAG, the params to the PythonOperators includes a hash that has > values of objects such as DagRun. As a result the resulting serialization of > the DAG is pretty large. When I looked at the text column for the record in > the serialized_dag table for this DAG, I saw that the data was cutoff mid DAG > and the character length was at 65535. This is because the data column is > type TEXT which has a max character length at 65535. So what I assume was > happenning is the process which was storing the DAG serialized data was > forced to truncate the DAG somewhere in the middle of the serialization. > Is it possible to maybe change the TEXT field to MEDIUMTEXT? Locally I made > the change on my mysql DB and the DAG was able to be serialized/deserialized > successfully. -- This message was sent by Atlassian Jira (v8.3.4#803005)