That worked! Thanks Daniel. Your help was much appreciated. Anthony
On Dec 9, 2021, at 12:53 PM, Daniel Standish <[email protected]<mailto:[email protected]>> wrote: OK this looks like an easy one to fix :) You can't use `params` as a param in an operator. It's already used in `BaseOperator` and has special handling for serialization. So rename it to `task_params` or anything else, and you should be good. On Thu, Dec 9, 2021 at 9:44 AM Anthony Joyce <[email protected]<mailto:[email protected]>> wrote: Hi Daniel- After some trial and error, I was able to isolate the issue. It has to do with my customer operator. See code below: from airflow.providers.mysql.hooks.mysql import MySqlHook from airflow.hooks.postgres_hook import PostgresHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from contextlib import closing from typing import Dict, Optional, Union class MySqlToPostgresOperator(BaseOperator): """Selects data from a MySQL database and inserts that data into a PostgreSQL database. Cursors are used to minimize memory usage for large queries. """ template_fields = ("sql", "postgres_table", "params") template_ext = (".sql",) ui_color = "#944dff" # cool purple @apply_defaults def __init__( self, sql: str, mysql_conn_id: str = "mysql_default", postgres_table: str = "", postgres_conn_id: str = "postgres_default", params: Optional[Dict[str, Union[str, int]]] = None, rows_chunk: int = 5000, *args, **kwargs, ): super().__init__(*args, **kwargs) if params is None: params = {} self.sql = sql self.mysql_conn_id = mysql_conn_id self.postgres_table = postgres_table self.postgres_conn_id = postgres_conn_id self.params = params self.rows_chunk = rows_chunk def execute(self, context): """Establish connections to both MySQL & PostgreSQL databases, open cursor and begin processing query, loading chunks of rows into PostgreSQL. Repeat loading chunks until all rows processed for query. """ source = MySqlHook(mysql_conn_id=self.mysql_conn_id) target = PostgresHook(postgres_conn_id=self.postgres_conn_id) with closing(source.get_conn()) as conn: with closing(conn.cursor()) as cursor: cursor.execute(self.sql, self.params) target_fields = [x[0] for x in cursor.description] row_count = 0 rows = cursor.fetchmany(self.rows_chunk) while len(rows) > 0: row_count += len(rows) target.insert_rows( self.postgres_table, rows, target_fields=target_fields, commit_every=self.rows_chunk, ) rows = cursor.fetchmany(self.rows_chunk) self.log.info<http://self.log.info/>( f"{row_count} row(s) inserted into {self.postgres_table}." ) Thanks, Anthony On Dec 9, 2021, at 11:52 AM, Daniel Standish <[email protected]<mailto:[email protected]>> wrote: Can you provide a dag (as simplified as possible) which we can use to reproduce this error? On Thu, Dec 9, 2021 at 8:45 AM Anthony Joyce <[email protected]<mailto:[email protected]>> wrote: Hello fellow users- I have encountered an error which seems to be related to serialization: Broken DAG: [/home/etl/airflow/dags/airflow_platypus_etl_dag.py] Traceback (most recent call last): File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 574, in serialize_operator serialize_op['params'] = cls._serialize_params_dict(op.params) File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 447, in _serialize_params_dict if f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param': AttributeError: 'str' object has no attribute '__module__' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 935, in to_dict json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)} File "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 847, in serialize_dag raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}') airflow.exceptions.SerializationError: Failed to serialize DAG 'Platypus_ETL': 'str' object has no attribute ‘__module__' I have spent some time trying to figure out what is going on but to no avail. Anyone have any insight on an error like this? I am on Airflow release 2.2.2 and I am using the default packages constraints. Thanks all, Anthony
