Hi  Tomasz,
What do you mean when you say "The mentioned DAG is missing"? I was add the
DAG, but gmail may have rejected that .py file.
I have attached again the file as .txt.

Thanks,
Mehmet.

Mehmet Ersoy <[email protected]>, 19 Şub 2020 Çar, 15:20 tarihinde
şunu yazdı:

> I'm using 1.10.6 version of Airflow.
> Yes, this problem occurs in all of my parallel DAGs. And I attached one of
> my DAGs in my first mail. In addition, graph view of my DAG is as follows:
>
> [image: image.png]
>
> Thanks.
>
>
> Tomasz Urbaszek <[email protected]>, 19 Şub 2020 Çar, 15:09
> tarihinde şunu yazdı:
>
>> What version of Airflow do you use?
>>
>> The mentioned DAG is missing but I'm curious about "parallel" jobs you are
>> running :) Does this problem occur with only one DAG?
>>
>> T.
>>
>> On Wed, Feb 19, 2020 at 12:58 PM Mehmet Ersoy <[email protected]
>> >
>> wrote:
>>
>> > Hi Tomasz,
>> > For now, I'm syncing the DAGs manually sending across airflow hosts. So,
>> > for now there is no git repository etc.
>> > In addition, My configs related with parallelism are as follows:
>> >
>> >
>> >
>> > # How many processes CeleryExecutor uses to sync task state.
>> >
>> > # 0 means to use max(1, number of cores - 1) processes.
>> >
>> > *sync_parallelism = 0*
>> >
>> >
>> >
>> >
>> >
>> > # The amount of parallelism as a setting to the executor. This defines
>> >
>> > # the max number of task instances that should run simultaneously
>> >
>> > # on this airflow installation
>> >
>> > *parallelism = 128*
>> >
>> >
>> >
>> >
>> >
>> > # The number of task instances allowed to run concurrently by the
>> scheduler
>> >
>> > *dag_concurrency = 128*
>> >
>> >
>> >
>> >
>> >
>> > # The concurrency that will be used when starting workers with the
>> >
>> > # "airflow worker" command. This defines the number of task instances
>> that
>> >
>> > # a worker will take, so size up your workers based on the resources on
>> >
>> > # your worker box and the nature of your tasks
>> >
>> > *worker_concurrency = 32*
>> >
>> >
>> >
>> >
>> >
>> > # The maximum number of active DAG runs per DAG
>> >
>> > *max_active_runs_per_dag = 32*
>> >
>> >
>> >
>> > Thank you,
>> >
>> > Best regards.
>> >
>> > Tomasz Urbaszek <[email protected]>, 19 Şub 2020 Çar, 13:12
>> tarihinde
>> > şunu yazdı:
>> >
>> > > Can you please tell me more about your environment? Especially how do
>> > > you sync your DAGs / logs from celery workers? I know one setup where
>> > > I've seen I/O error when wiritting to a log...
>> > >
>> > > T.
>> > >
>> > >
>> > > On Wed, Feb 19, 2020 at 10:56 AM Mehmet Ersoy
>> > > <[email protected]> wrote:
>> > > >
>> > > > Hello Friends,
>> > > >
>> > > > I'm new to Airflow and I'm using Airflow Celery executor with
>> Postgres
>> > > backend and Redis Message Queue service. For now, there is 4 worker, 1
>> > > Scheduler and 1 Web Server.
>> > > > I have been preparing parallel Sqoop Jobs in my daily DAGs.
>> > > > When I scheduled a daily DAG, Often some task instances turning to
>> > > failed without running state after started state. Then I can't see the
>> > logs
>> > > of them. It's blank. And when I run that task manually it's running
>> > without
>> > > any problem.
>> > > > I don't really understand if there is an inconsistent situation in
>> my
>> > > DAG writing.
>> > > > I have attached one of my DAGs.
>> > > >
>> > > > Thank you in advance,
>> > > > Best Regards.
>> > > > Mehmet.
>> > >
>> >
>> >
>> > --
>> > Mehmet ERSOY
>> >
>>
>>
>> --
>>
>> Tomasz Urbaszek
>> Polidea <https://www.polidea.com/> | Software Engineer
>>
>> M: +48 505 628 493 <+48505628493>
>> E: [email protected] <[email protected]>
>>
>> Unique Tech
>> Check out our projects! <https://www.polidea.com/our-work>
>>
>
>
> --
> Mehmet ERSOY
>


-- 
Mehmet ERSOY
from airflow import DAG

import datetime as dt

from airflow.operators.bash_operator import BashOperator

from airflow.operators.python_operator import PythonOperator

import json

import os

import cx_Oracle

import time

from datetime import datetime,timedelta

from impala.dbapi import connect

from impala.util import as_pandas

 

 

class HadoopOps:

    def __init__(self):

        impala_conn = connect(host='hostname', auth_mechanism = 'GSSAPI',

        database='default', port=21050, user='username', use_ssl=True, 
kerberos_service_name='impala')

        self.conn = impala_conn

        self.cur = self.conn.cursor()

 

class HadoopSnapshotOps:

     def drop_table(self, schema, table_name):

         hadoop_ops = HadoopOps()

         try:

             hadoop_ops.cur.execute("REFRESH {}.{}".format(schema, table_name))

             time.sleep(2.4)

         except Exception as err:

             print(err)

             hadoop_ops.cur.execute("INVALIDATE METADATA")

             time.sleep(2.4)

         sql = "DROP TABLE IF EXISTS 
{schema}.{table_name}".format(schema=schema, table_name=table_name)

         hadoop_ops.cur.execute(sql)

         hadoop_ops.conn.close()

 

class SqlOps:

    def __init__(self):

        dsn = cx_Oracle.makedsn('hostname', 1521, service_name = 'mydb')

        con = cx_Oracle.connect(user = 'user', password = '123456', dsn = dsn)

        self.cur = con.cursor()

 

    def sql_to_dict(self, cur):

        column_names = list(map(lambda x: x, [d[0] for d in cur.description]))

        rows = list(cur.fetchall())

        result = [dict(zip(column_names, row)) for row in rows]

        return list(result)

 

    def get_parameter(self):

        cur = self.cur

        cur.execute("SELECT * FROM SQOOP.EXT_SNAPSHOT_PARAMETER WHERE 
ETL_TYPE='FULL' AND IS_ACTV=1")

        return self.sql_to_dict(cur)

 

    def get_column_mapping(self, schema, table_name, transfer_type):

        query = """

        SELECT

        RTRIM(

        XMLAGG(XMLELEMENT(e, column_name ||

        CASE data_type WHEN 'NUMBER'

        THEN '=java.math.BigDecimal'

        ELSE '=String'

        END || ',') ORDER BY column_id).EXTRACT(

                '//text()').getclobval(), ',') COLUMN_MAPPING

        FROM  ALL_TAB_COLS WHERE OWNER = '{}' AND TABLE_NAME = '{}'

        """.format(schema, table_name)

        cur = self.cur

        cur.execute(query)

        return (self.sql_to_dict(cur)[0]['COLUMN_MAPPING'])

 

 

 

 

sql_ops = SqlOps()

params = sql_ops.get_parameter()

 

 

schedule = '@daily'

schema = 'ODS'

dag_id = 'EOD_Snapshot'

 

args = {

        'owner': 'EOD_Snapshot',

        'start_date': dt.datetime(2020, 2, 13)

}

 

current_date = str(datetime.now().date().strftime('%Y_%m_%d'))

 

dag = DAG(dag_id, default_args=args, schedule_interval=schedule, catchup=False)

 

sqoop_job = """

sqoop import

  -D mapred.child.java.opts=-Xmx4000M

  -D mapred.map.tasks.speculative.execution=false

  -D oraoop.jdbc.url.verbatim=true

  --options-file {option_file}

  --verbose

  --as-parquetfile

  --compress

  --compression-codec snappy

  --map-column-java {column_mapping}

  --num-mappers {parallelism}

  --query "SELECT /*FULL(t) NO_PARALLEL(t)*/ t.* FROM {schema}.{table} t WHERE 
\$CONDITIONS"

  --split-by "{splitby}"

  --target-dir {targetdir}

  --delete-target-dir

  --hive-import

  --hive-database {hive_database}

  --hive-table {hive_table}

"""

 

with dag:

    for t in params:

        tmap = sql_ops.get_column_mapping(t['SRC_SCHEMA'],

                t['SRC_TABLE_NAME'], t['TRANSFER_TYPE'])

        cmd = sqoop_job.format(

            option_file=t['OPTION_FILE'],

            column_mapping=tmap,

            parallelism=t['PARALLELISM'],

            schema=t['SRC_SCHEMA'],

            table=t['SRC_TABLE_NAME'],

            splitby=t['SPLIT_BY'],

            targetdir=t['TARGET_DIR'].format(current_date=current_date),

            hive_database=t['TARGET_SCHEMA'],

            hive_table=t['TARGET_TABLE'].format(current_date=current_date))

        etl_start = 
BashOperator(task_id='EXT_SNAPSHOT_{}_{}_SQOOP'.format(t['SRC_SCHEMA'], 
t['SRC_TABLE_NAME']),

                                 trigger_rule='all_done',

                                 pool='ods_etl_eod_pool',

                                 bash_command=cmd.replace('\n', ''),

                                 dag=dag)

 

        if t['RETENTION_PERIODICITY'] == 'day':

            will_removed_table = 
t['TARGET_TABLE'].format(current_date=(datetime.now() - 
timedelta(days=t['RETENTION_PERIODICITY_TIME'])).strftime('%Y_%m_%d'))

            print(will_removed_table + " will drop")

 

        hadoop_snapshot_ops = HadoopSnapshotOps()

        drop_retention_data = 
PythonOperator(task_id='DROP_{}_{}'.format(t['TARGET_SCHEMA'], 
will_removed_table),

                                          trigger_rule='all_success',

                                          
python_callable=hadoop_snapshot_ops.drop_table,

                                          pool='ods_etl_eod_pool',

                                          
op_args=[t['TARGET_SCHEMA'],will_removed_table],

                                          dag=dag)

 

 

        etl_start >> drop_retention_data

Reply via email to