[ 
https://issues.apache.org/jira/browse/AIRFLOW-6795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nathaniel Ritholtz updated AIRFLOW-6795:
----------------------------------------
    Summary: serialized_dag table's data column text type is too small for 
mysql  (was: serialized_dag table's data type max length is too small)

> serialized_dag table's data column text type is too small for mysql
> -------------------------------------------------------------------
>
>                 Key: AIRFLOW-6795
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6795
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: serialization
>    Affects Versions: 1.10.9
>            Reporter: Nathaniel Ritholtz
>            Priority: Major
>
> When upgrading to v1.10.9, I tried using the new store_serialized_dags flag. 
> However, the scheduler was erroring out with:
> {code}
> scheduler_1  | Process DagFileProcessor2163-Process:
> scheduler_1  | Traceback (most recent call last):
> scheduler_1  |   File "/usr/local/lib/python3.6/multiprocessing/process.py", 
> line 258, in _bootstrap
> scheduler_1  |     self.run()
> scheduler_1  |   File "/usr/local/lib/python3.6/multiprocessing/process.py", 
> line 93, in run
> scheduler_1  |     self._target(*self._args, **self._kwargs)
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 
> 157, in _run_file_processor
> scheduler_1  |     pickle_dags)
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in 
> wrapper
> scheduler_1  |     return func(*args, **kwargs)
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 
> 1580, in process_file
> scheduler_1  |     dag.sync_to_db()
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in 
> wrapper
> scheduler_1  |     return func(*args, **kwargs)
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1514, in 
> sync_to_db
> scheduler_1  |     session=session
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in 
> wrapper
> scheduler_1  |     return func(*args, **kwargs)
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/airflow/models/serialized_dag.py", 
> line 118, in write_dag
> scheduler_1  |     session.merge(cls(dag))
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 
> 2113, in merge
> scheduler_1  |     _resolve_conflict_map=_resolve_conflict_map,
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 
> 2186, in _merge
> scheduler_1  |     merged = self.query(mapper.class_).get(key[1])
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 1004, 
> in get
> scheduler_1  |     return self._get_impl(ident, loading.load_on_pk_identity)
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 1116, 
> in _get_impl
> scheduler_1  |     return db_load_fn(self, primary_key_identity)
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 284, 
> in load_on_pk_identity
> scheduler_1  |     return q.one()
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3347, 
> in one
> scheduler_1  |     ret = self.one_or_none()
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3316, 
> in one_or_none
> scheduler_1  |     ret = list(self)
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 101, 
> in instances
> scheduler_1  |     util.raise_from_cause(err)
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, 
> in raise_from_cause
> scheduler_1  |     reraise(type(exception), exception, tb=exc_tb, cause=cause)
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, 
> in reraise
> scheduler_1  |     raise value
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81, 
> in instances
> scheduler_1  |     rows = [proc(row) for row in fetch]
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81, 
> in <listcomp>
> scheduler_1  |     rows = [proc(row) for row in fetch]
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 574, 
> in _instance
> scheduler_1  |     populators,
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 695, 
> in _populate_full
> scheduler_1  |     dict_[key] = getter(row)
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/type_api.py", line 
> 1266, in process
> scheduler_1  |     return process_value(impl_processor(value), dialect)
> scheduler_1  |   File 
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/sqltypes.py", line 
> 2407, in process
> scheduler_1  |     return json_deserializer(value)
> scheduler_1  |   File "/usr/local/lib/python3.6/json/__init__.py", line 354, 
> in loads
> scheduler_1  |     return _default_decoder.decode(s)
> scheduler_1  |   File "/usr/local/lib/python3.6/json/decoder.py", line 339, 
> in decode
> scheduler_1  |     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> scheduler_1  |   File "/usr/local/lib/python3.6/json/decoder.py", line 355, 
> in raw_decode
> scheduler_1  |     obj, end = self.scan_once(s, idx)
> {code}
> After further investigation, I found it was because of my usage of the 
> airflow-db-dag 
> (https://github.com/teamclairvoyant/airflow-maintenance-dags/blob/master/db-cleanup/airflow-db-cleanup.py).
>  In this DAG, the params to the PythonOperators includes a hash that has 
> values of objects such as DagRun. As a result the resulting serialization of 
> the DAG is pretty large. When I looked at the text column for the record in 
> the serialized_dag table for this DAG, I saw that the data was cutoff mid DAG 
> and the character length was at 65535. This is because the data column is 
> type TEXT which has a max character length at 65535. So what I assume was 
> happenning is the process which was storing the DAG serialized data was 
> forced to truncate the DAG somewhere in the middle of the serialization.
> Is it possible to maybe change the TEXT field to MEDIUMTEXT? Locally I made 
> the change on my mysql DB and the DAG was able to be serialized/deserialized 
> successfully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to