gmarendaz commented on issue #35979: URL: https://github.com/apache/airflow/issues/35979#issuecomment-1838256382
@yiqijiu : thanks for your answer. Indeed, I didn't have the encoding declaration at the beginning of my DAG code and I added it but the problem remains the same. Here is the full code below : ` # -*- coding: utf-8 -*- from _lib import _template_slave from airflow import DAG from airflow.hooks.base_hook import BaseHook from airflow.models import Variable from airflow.models import XCom from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from airflow.utils.db import provide_session from datetime import datetime from io import StringIO, BytesIO import chardet import datetime import json import logging import ntpath import numpy as np import os import pandas as pd import re import sqlalchemy import unicodedata import yaml DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "") default_args = { 'owner': 'IMEDA', 'start_date': days_ago(2), 'weight_rule': "upstream" } ## CHECK def check_file(**kwargs): path = kwargs["dag_run"].conf['path'] blacklist = Variable.get("manufacturing_blacklist_masp", deserialize_json=True) if any(s in path for s in blacklist): return "blacklist" else: return "pick_extract_mode" def pick_extract_mode(**kwargs): path = kwargs["dag_run"].conf['path'] with open(path, encoding='latin_1') as f: file_ = [line for line in f] for n,line in enumerate(file_): if line.startswith("<Info>"): return "new_extract" return "old_extract" ## EXTRACT def get_header_fields(path, df, comment): with open(path, encoding='latin_1') as f: for line in f: if line.startswith(comment + "\tNo de série :"): df["batch_id"] = re.search(r'.*No de série :\t([a-zA-Z0-9]*)',line).group(1) n_plaque = re.search(r'.*N° plaque :\t([0-9]*)',line).group(1) # Fill with zeros to keep order 01, 02, .. , 10, 11,... if len(n_plaque) < 2: n_plaque = n_plaque.zfill(2) df["n_plaque"] = n_plaque elif line.startswith(comment + "\tProduit :"): df["produit"] = re.search(r'.*Produit :\t(.*)\ttechnologie :',line).group(1) df["technologie"] = re.search(r'.*technologie :\t(.*)\tOpérateur',line).group(1) if line.startswith(comment + "\tFichier limites :"): limit_file = re.search(r'\tFichier limites :\t(.*)\tdu :',line).group(1) df["limit_file"] = limit_file.lower().replace(" ", "") return df def old_extract(**kwargs): path = kwargs["dag_run"].conf['path'] header_size = _template_slave.header(path, "Puce N°") raw_df = pd.read_csv(path, header=header_size, sep="\t", engine='python', encoding='latin_1', encoding_errors='ignore', on_bad_lines='skip') # Keep only rows with more than 5 non-NaN values df = raw_df[raw_df.count(axis=1) > 6][1:] df = get_header_fields(path, df, comment = "") str_columns = [col for col in df.columns if isinstance(col, str)] df[str_columns] = df[str_columns].rename(columns=str.lower)\ .rename(columns=_template_slave.remove_accents)\ .rename(columns=_template_slave.remove_special_characters) return df def new_extract(**kwargs): path = kwargs["dag_run"].conf['path'] df = pd.read_csv(path, comment='#', sep="\t", header=[0,1], engine='python', encoding='latin_1', encoding_errors='ignore', on_bad_lines='skip') df.columns = df.columns.map('_'.join) str_columns = [col for col in df.columns if isinstance(col, str)] df[str_columns] = df[str_columns].rename(columns=str.lower)\ .rename(columns=_template_slave.remove_accents)\ .rename(columns=_template_slave.remove_special_characters) df = get_header_fields(path, df, comment="###\t") with open(path, encoding='latin_1') as f: file_ = [line for line in f] for n,line in enumerate(file_): if line.startswith("Fichier clôturé"): return df[1:-1] return df[1:] ## TRANSFORM """def custom_deserialize(df): return json.load(df.decode('latin_1'))""" def concat_time_date(df): df['date_heure'] = pd.to_datetime(df['info_date'] + " " + df['info_heure'], dayfirst = True, errors="coerce") df = df.drop(['info_date', 'info_heure'], axis=1) return df def transform(**kwargs): ti = kwargs['ti'] task_outputs = ti.xcom_pull(task_ids=["old_extract", "new_extract"]) logging.info(task_outputs) extracting_res = [output for output in task_outputs if output is not None] df = extracting_res[0] df = df.rename(columns={"puce_n":"info_puce_n", "pos_x": "info_pos_x", "pos_y": "info_pos_y", "date" : "info_date", "heure" : "info_heure", 'pos_a_x_g' : 'capa_meas_pos_meas_at_x', "status" : "info_status", "csup_a_x_g" : "capa_meas_csup_x_g", 'cinf_a_x_g' : 'capa_meas_cinf_x_g', 'csup_a_0_g' : 'capa_meas_csup_0_g', 'rsup_a_0_g' : 'capa_meas_rsup_0_g', 'cinf_a_0_g' : 'capa_meas_cinf_0_g', 'rinf_a_0_g' : 'capa_meas_rinf_0_g', 'csup_pol' : 'capa_meas_csup_pol', 'cinf_pol' : 'capa_meas_cinf_pol', 'csup_hyst' : 'capa_meas_csup_hys', 'cinf_hyst' : 'capa_meas_cinf_hys', 'c_tot' : 'capa_meas_c_tot', 'sz' : 'capa_meas_sz_percent', 'sz_1' : 'capa_meas_sz_pf', 'so' : 'capa_meas_s0_a', 'd_pol_sup' : 'capa_meas_delta_csup_pol', 'd_pol_inf' : 'capa_meas_delta_cinf_pol', 'd_hyst_sup' : 'capa_meas_delta_csup_hys', 'd_hyst_inf' : 'capa_meas_delta_cinf_hys', 'ampl_a_0' : 'bw_meas_ampl_0_hz', 'ampl_x_hz' : 'bw_meas_ampl_x_hz', 'fc' : 'bw_meas_fc', 'ampl_min' : 'bw_meas_ampl_min', 'f_min' : 'bw_meas_f_min', 'ampl_res' : 'bw_meas_ampl_res', 'f_res' : 'bw_meas_f_res', 'mse' : 'bw_meas_mse', 'press' : 'bw_meas_press', 'q' : 'bw_meas_q'}) schema = _template_slave.get_schema("test_wafer") df = concat_time_date(df) df = _template_slave.filter_(df, schema) df = _template_slave.cast(df, schema) df["location"] = kwargs["dag_run"].conf['location'] df["src_path"] = kwargs["dag_run"].conf['path'] return df ## LOAD def load(**kwargs): engine = sqlalchemy.create_engine(BaseHook.get_connection("database").get_uri()) ti = kwargs['ti'] df = ti.xcom_pull(task_ids='transform') try: df.to_sql("test_wafer", con=engine, if_exists='append', index=False) except sqlalchemy.exc.IntegrityError: pass @provide_session def cleanup_xcom(session=None, **kwargs): session.query(XCom).filter(XCom.dag_id == DAG_ID)\ .filter(XCom.execution_date == kwargs["execution_date"])\ .delete() with DAG( dag_id=DAG_ID, default_args=default_args, schedule_interval=None, max_active_runs=1, tags = ["manufacturing", "tests"] ) as dag: check_task = BranchPythonOperator( task_id="check_file", python_callable=check_file ) blacklist_task = DummyOperator( task_id="blacklist" ) pick_extract_mode_task = BranchPythonOperator( task_id="pick_extract_mode", python_callable=pick_extract_mode ) old_extract_task = PythonOperator( task_id='old_extract', python_callable=old_extract, ) new_extract_task = PythonOperator( task_id='new_extract', python_callable=new_extract, ) transform_task = PythonOperator( task_id='transform', python_callable=transform, trigger_rule="none_failed" ) load_task = PythonOperator( task_id='load', python_callable=load, pool="wafer_pool" ) clean_xcom_task = PythonOperator( task_id="clean_xcom", python_callable = cleanup_xcom, ) check_task >> [pick_extract_mode_task, blacklist_task] pick_extract_mode_task >> [old_extract_task, new_extract_task] [old_extract_task, new_extract_task] >> transform_task >> load_task >> clean_xcom_task ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org