gmarendaz commented on issue #35979:
URL: https://github.com/apache/airflow/issues/35979#issuecomment-1838256382

   @yiqijiu : thanks for your answer. Indeed, I didn't have the encoding 
declaration at the beginning of my DAG code and I added it but the problem 
remains the same.
   
   Here is the full code below : 
   `
   # -*- coding: utf-8 -*-
   
   from _lib import _template_slave
   from airflow import DAG
   from airflow.hooks.base_hook import BaseHook
   from airflow.models import Variable
   from airflow.models import XCom
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.python_operator import BranchPythonOperator
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   from airflow.utils.db import provide_session
   from datetime import datetime
   from io import StringIO, BytesIO
   import chardet
   import datetime
   import json
   import logging
   import ntpath
   import numpy as np
   import os
   import pandas as pd
   import re
   import sqlalchemy
   import unicodedata
   import yaml
   
   DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
   
   default_args = {
       'owner': 'IMEDA',
       'start_date': days_ago(2),
       'weight_rule': "upstream"
   }
   
   ## CHECK
   def check_file(**kwargs):
       path = kwargs["dag_run"].conf['path']
       blacklist = Variable.get("manufacturing_blacklist_masp", 
deserialize_json=True)
       if any(s in path for s in blacklist):
           return "blacklist"
       else:
           return "pick_extract_mode"
   
   def pick_extract_mode(**kwargs):
       path = kwargs["dag_run"].conf['path']
       with open(path, encoding='latin_1') as f:
           file_ = [line for line in f]
           for n,line in enumerate(file_):
               if line.startswith("<Info>"):
                   return "new_extract"
       return "old_extract"
   
   ## EXTRACT
   def get_header_fields(path, df, comment):
       with open(path, encoding='latin_1') as f:
           for line in f:
               if line.startswith(comment + "\tNo de série :"):
                   df["batch_id"] = re.search(r'.*No de série 
:\t([a-zA-Z0-9]*)',line).group(1)
                   n_plaque = re.search(r'.*N° plaque 
:\t([0-9]*)',line).group(1)
                   # Fill with zeros to keep order 01, 02, .. , 10, 11,...
                   if len(n_plaque) < 2:
                       n_plaque = n_plaque.zfill(2)
                   df["n_plaque"] = n_plaque
               elif line.startswith(comment + "\tProduit :"):
                   df["produit"] = re.search(r'.*Produit :\t(.*)\ttechnologie 
:',line).group(1)
                   df["technologie"] = re.search(r'.*technologie 
:\t(.*)\tOpérateur',line).group(1)
               if line.startswith(comment + "\tFichier limites :"):
                   limit_file = re.search(r'\tFichier limites :\t(.*)\tdu 
:',line).group(1)
                   df["limit_file"] = limit_file.lower().replace(" ", "")
       return df
   
   def old_extract(**kwargs):
       path = kwargs["dag_run"].conf['path']
       header_size = _template_slave.header(path, "Puce N°")
       raw_df = pd.read_csv(path, header=header_size, sep="\t", 
engine='python', encoding='latin_1', encoding_errors='ignore', 
on_bad_lines='skip')
       # Keep only rows with more than 5 non-NaN values 
       df = raw_df[raw_df.count(axis=1) > 6][1:]
       df = get_header_fields(path, df, comment = "")
       str_columns = [col for col in df.columns if isinstance(col, str)]
       df[str_columns] = df[str_columns].rename(columns=str.lower)\
                                        
.rename(columns=_template_slave.remove_accents)\
                                        
.rename(columns=_template_slave.remove_special_characters)
       return df
   
   def new_extract(**kwargs):
       path = kwargs["dag_run"].conf['path']
       df = pd.read_csv(path, comment='#', sep="\t", header=[0,1], 
engine='python', encoding='latin_1', encoding_errors='ignore', 
on_bad_lines='skip')
       df.columns = df.columns.map('_'.join)
       str_columns = [col for col in df.columns if isinstance(col, str)]
       df[str_columns] = df[str_columns].rename(columns=str.lower)\
                                        
.rename(columns=_template_slave.remove_accents)\
                                        
.rename(columns=_template_slave.remove_special_characters)
       df = get_header_fields(path, df, comment="###\t")
       with open(path, encoding='latin_1') as f:
               file_ = [line for line in f]
               for n,line in enumerate(file_):
                   if line.startswith("Fichier clôturé"):
                       return df[1:-1]
       return df[1:]
   
   ## TRANSFORM
   """def custom_deserialize(df):
       return json.load(df.decode('latin_1'))"""
   
   def concat_time_date(df):
       df['date_heure'] = pd.to_datetime(df['info_date'] +  " " + 
df['info_heure'], dayfirst = True, errors="coerce")
       df = df.drop(['info_date', 'info_heure'], axis=1)
       return df
   
   def transform(**kwargs):
       ti = kwargs['ti']
       task_outputs = ti.xcom_pull(task_ids=["old_extract", "new_extract"])
       logging.info(task_outputs)
       extracting_res = [output for output in task_outputs if output is not 
None]
       df = extracting_res[0]
       df = df.rename(columns={"puce_n":"info_puce_n",
                               "pos_x": "info_pos_x",
                               "pos_y": "info_pos_y",
                               "date" : "info_date",
                               "heure" : "info_heure",
                               'pos_a_x_g' : 'capa_meas_pos_meas_at_x',
                               "status" : "info_status",
                               "csup_a_x_g" : "capa_meas_csup_x_g",
                               'cinf_a_x_g' : 'capa_meas_cinf_x_g',
                               'csup_a_0_g' : 'capa_meas_csup_0_g',
                               'rsup_a_0_g' : 'capa_meas_rsup_0_g',
                               'cinf_a_0_g' : 'capa_meas_cinf_0_g',
                               'rinf_a_0_g' : 'capa_meas_rinf_0_g',
                               'csup_pol' : 'capa_meas_csup_pol',
                               'cinf_pol' : 'capa_meas_cinf_pol',
                               'csup_hyst' : 'capa_meas_csup_hys',
                               'cinf_hyst' : 'capa_meas_cinf_hys',
                               'c_tot' : 'capa_meas_c_tot',
                               'sz' : 'capa_meas_sz_percent',
                               'sz_1' : 'capa_meas_sz_pf',
                               'so' : 'capa_meas_s0_a',
                               'd_pol_sup' : 'capa_meas_delta_csup_pol',
                               'd_pol_inf' : 'capa_meas_delta_cinf_pol',
                               'd_hyst_sup' : 'capa_meas_delta_csup_hys',
                               'd_hyst_inf' : 'capa_meas_delta_cinf_hys',
                               'ampl_a_0' : 'bw_meas_ampl_0_hz',
                               'ampl_x_hz' : 'bw_meas_ampl_x_hz',
                               'fc' : 'bw_meas_fc',
                               'ampl_min' : 'bw_meas_ampl_min',
                               'f_min' : 'bw_meas_f_min',
                               'ampl_res' : 'bw_meas_ampl_res',
                               'f_res' : 'bw_meas_f_res',
                               'mse' : 'bw_meas_mse',
                               'press' : 'bw_meas_press',
                               'q' : 'bw_meas_q'})
       schema = _template_slave.get_schema("test_wafer")
       df = concat_time_date(df)
       df = _template_slave.filter_(df, schema)
       df = _template_slave.cast(df, schema)
       df["location"] = kwargs["dag_run"].conf['location']
       df["src_path"] = kwargs["dag_run"].conf['path']
       return df
   
   ## LOAD 
   def load(**kwargs):
       engine = 
sqlalchemy.create_engine(BaseHook.get_connection("database").get_uri())
       ti = kwargs['ti']
       df = ti.xcom_pull(task_ids='transform')
       try:
           df.to_sql("test_wafer", con=engine, if_exists='append', index=False)
       except sqlalchemy.exc.IntegrityError:
           pass
   
   @provide_session
   def cleanup_xcom(session=None, **kwargs): 
       session.query(XCom).filter(XCom.dag_id == DAG_ID)\
                          .filter(XCom.execution_date == 
kwargs["execution_date"])\
                          .delete()
   with DAG(
       dag_id=DAG_ID,
       default_args=default_args,
       schedule_interval=None,
       max_active_runs=1,
       tags = ["manufacturing", "tests"]
   ) as dag:
       check_task = BranchPythonOperator(
           task_id="check_file",
           python_callable=check_file
       )
       blacklist_task = DummyOperator(
           task_id="blacklist"
       )
       pick_extract_mode_task = BranchPythonOperator(
           task_id="pick_extract_mode",
           python_callable=pick_extract_mode
       )
       old_extract_task = PythonOperator(
           task_id='old_extract',
           python_callable=old_extract,
       )
       new_extract_task = PythonOperator(
           task_id='new_extract',
           python_callable=new_extract,
       )
       transform_task = PythonOperator(
           task_id='transform',
           python_callable=transform,
           trigger_rule="none_failed"
       )
       load_task = PythonOperator(
           task_id='load',
           python_callable=load,
           pool="wafer_pool"
       )
       clean_xcom_task = PythonOperator(
           task_id="clean_xcom",
           python_callable = cleanup_xcom,
       )
       check_task >> [pick_extract_mode_task, blacklist_task]
       pick_extract_mode_task >> [old_extract_task, new_extract_task]
       [old_extract_task, new_extract_task] >> transform_task >> load_task >> 
clean_xcom_task
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to