Hi Daniel-
After some trial and error, I was able to isolate the issue. It has to do with
my customer operator. See code below:
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from contextlib import closing
from typing import Dict, Optional, Union
class MySqlToPostgresOperator(BaseOperator):
"""Selects data from a MySQL database and inserts that data into a
PostgreSQL database. Cursors are used to minimize memory usage for large
queries.
"""
template_fields = ("sql", "postgres_table", "params")
template_ext = (".sql",)
ui_color = "#944dff" # cool purple
@apply_defaults
def __init__(
self,
sql: str,
mysql_conn_id: str = "mysql_default",
postgres_table: str = "",
postgres_conn_id: str = "postgres_default",
params: Optional[Dict[str, Union[str, int]]] = None,
rows_chunk: int = 5000,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
if params is None:
params = {}
self.sql = sql
self.mysql_conn_id = mysql_conn_id
self.postgres_table = postgres_table
self.postgres_conn_id = postgres_conn_id
self.params = params
self.rows_chunk = rows_chunk
def execute(self, context):
"""Establish connections to both MySQL & PostgreSQL databases, open
cursor and begin processing query, loading chunks of rows into
PostgreSQL. Repeat loading chunks until all rows processed for query.
"""
source = MySqlHook(mysql_conn_id=self.mysql_conn_id)
target = PostgresHook(postgres_conn_id=self.postgres_conn_id)
with closing(source.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute(self.sql, self.params)
target_fields = [x[0] for x in cursor.description]
row_count = 0
rows = cursor.fetchmany(self.rows_chunk)
while len(rows) > 0:
row_count += len(rows)
target.insert_rows(
self.postgres_table,
rows,
target_fields=target_fields,
commit_every=self.rows_chunk,
)
rows = cursor.fetchmany(self.rows_chunk)
self.log.info<http://self.log.info>(
f"{row_count} row(s) inserted into {self.postgres_table}."
)
Thanks,
Anthony
On Dec 9, 2021, at 11:52 AM, Daniel Standish
<[email protected]<mailto:[email protected]>> wrote:
Can you provide a dag (as simplified as possible) which we can use to reproduce
this error?
On Thu, Dec 9, 2021 at 8:45 AM Anthony Joyce
<[email protected]<mailto:[email protected]>> wrote:
Hello fellow users-
I have encountered an error which seems to be related to serialization:
Broken DAG: [/home/etl/airflow/dags/airflow_platypus_etl_dag.py] Traceback
(most recent call last): File
"/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py",
line 574, in serialize_operator serialize_op['params'] =
cls._serialize_params_dict(op.params) File
"/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py",
line 447, in _serialize_params_dict if
f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param':
AttributeError: 'str' object has no attribute '__module__' During handling of
the above exception, another exception occurred: Traceback (most recent call
last): File
"/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py",
line 935, in to_dict json_dict = {"__version": cls.SERIALIZER_VERSION, "dag":
cls.serialize_dag(var)} File
"/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py",
line 847, in serialize_dag raise SerializationError(f'Failed to serialize DAG
{dag.dag_id!r}: {e}') airflow.exceptions.SerializationError: Failed to
serialize DAG 'Platypus_ETL': 'str' object has no attribute ‘__module__'
I have spent some time trying to figure out what is going on but to no avail.
Anyone have any insight on an error like this? I am on Airflow release 2.2.2
and I am using the default packages constraints.
Thanks all,
Anthony