Hi Tomasz,
What do you mean when you say "The mentioned DAG is missing"? I was add the
DAG, but gmail may have rejected that .py file.
I have attached again the file as .txt.
Thanks,
Mehmet.
Mehmet Ersoy <[email protected]>, 19 Şub 2020 Çar, 15:20 tarihinde
şunu yazdı:
> I'm using 1.10.6 version of Airflow.
> Yes, this problem occurs in all of my parallel DAGs. And I attached one of
> my DAGs in my first mail. In addition, graph view of my DAG is as follows:
>
> [image: image.png]
>
> Thanks.
>
>
> Tomasz Urbaszek <[email protected]>, 19 Şub 2020 Çar, 15:09
> tarihinde şunu yazdı:
>
>> What version of Airflow do you use?
>>
>> The mentioned DAG is missing but I'm curious about "parallel" jobs you are
>> running :) Does this problem occur with only one DAG?
>>
>> T.
>>
>> On Wed, Feb 19, 2020 at 12:58 PM Mehmet Ersoy <[email protected]
>> >
>> wrote:
>>
>> > Hi Tomasz,
>> > For now, I'm syncing the DAGs manually sending across airflow hosts. So,
>> > for now there is no git repository etc.
>> > In addition, My configs related with parallelism are as follows:
>> >
>> >
>> >
>> > # How many processes CeleryExecutor uses to sync task state.
>> >
>> > # 0 means to use max(1, number of cores - 1) processes.
>> >
>> > *sync_parallelism = 0*
>> >
>> >
>> >
>> >
>> >
>> > # The amount of parallelism as a setting to the executor. This defines
>> >
>> > # the max number of task instances that should run simultaneously
>> >
>> > # on this airflow installation
>> >
>> > *parallelism = 128*
>> >
>> >
>> >
>> >
>> >
>> > # The number of task instances allowed to run concurrently by the
>> scheduler
>> >
>> > *dag_concurrency = 128*
>> >
>> >
>> >
>> >
>> >
>> > # The concurrency that will be used when starting workers with the
>> >
>> > # "airflow worker" command. This defines the number of task instances
>> that
>> >
>> > # a worker will take, so size up your workers based on the resources on
>> >
>> > # your worker box and the nature of your tasks
>> >
>> > *worker_concurrency = 32*
>> >
>> >
>> >
>> >
>> >
>> > # The maximum number of active DAG runs per DAG
>> >
>> > *max_active_runs_per_dag = 32*
>> >
>> >
>> >
>> > Thank you,
>> >
>> > Best regards.
>> >
>> > Tomasz Urbaszek <[email protected]>, 19 Şub 2020 Çar, 13:12
>> tarihinde
>> > şunu yazdı:
>> >
>> > > Can you please tell me more about your environment? Especially how do
>> > > you sync your DAGs / logs from celery workers? I know one setup where
>> > > I've seen I/O error when wiritting to a log...
>> > >
>> > > T.
>> > >
>> > >
>> > > On Wed, Feb 19, 2020 at 10:56 AM Mehmet Ersoy
>> > > <[email protected]> wrote:
>> > > >
>> > > > Hello Friends,
>> > > >
>> > > > I'm new to Airflow and I'm using Airflow Celery executor with
>> Postgres
>> > > backend and Redis Message Queue service. For now, there is 4 worker, 1
>> > > Scheduler and 1 Web Server.
>> > > > I have been preparing parallel Sqoop Jobs in my daily DAGs.
>> > > > When I scheduled a daily DAG, Often some task instances turning to
>> > > failed without running state after started state. Then I can't see the
>> > logs
>> > > of them. It's blank. And when I run that task manually it's running
>> > without
>> > > any problem.
>> > > > I don't really understand if there is an inconsistent situation in
>> my
>> > > DAG writing.
>> > > > I have attached one of my DAGs.
>> > > >
>> > > > Thank you in advance,
>> > > > Best Regards.
>> > > > Mehmet.
>> > >
>> >
>> >
>> > --
>> > Mehmet ERSOY
>> >
>>
>>
>> --
>>
>> Tomasz Urbaszek
>> Polidea <https://www.polidea.com/> | Software Engineer
>>
>> M: +48 505 628 493 <+48505628493>
>> E: [email protected] <[email protected]>
>>
>> Unique Tech
>> Check out our projects! <https://www.polidea.com/our-work>
>>
>
>
> --
> Mehmet ERSOY
>
--
Mehmet ERSOY
from airflow import DAG
import datetime as dt
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import json
import os
import cx_Oracle
import time
from datetime import datetime,timedelta
from impala.dbapi import connect
from impala.util import as_pandas
class HadoopOps:
def __init__(self):
impala_conn = connect(host='hostname', auth_mechanism = 'GSSAPI',
database='default', port=21050, user='username', use_ssl=True,
kerberos_service_name='impala')
self.conn = impala_conn
self.cur = self.conn.cursor()
class HadoopSnapshotOps:
def drop_table(self, schema, table_name):
hadoop_ops = HadoopOps()
try:
hadoop_ops.cur.execute("REFRESH {}.{}".format(schema, table_name))
time.sleep(2.4)
except Exception as err:
print(err)
hadoop_ops.cur.execute("INVALIDATE METADATA")
time.sleep(2.4)
sql = "DROP TABLE IF EXISTS
{schema}.{table_name}".format(schema=schema, table_name=table_name)
hadoop_ops.cur.execute(sql)
hadoop_ops.conn.close()
class SqlOps:
def __init__(self):
dsn = cx_Oracle.makedsn('hostname', 1521, service_name = 'mydb')
con = cx_Oracle.connect(user = 'user', password = '123456', dsn = dsn)
self.cur = con.cursor()
def sql_to_dict(self, cur):
column_names = list(map(lambda x: x, [d[0] for d in cur.description]))
rows = list(cur.fetchall())
result = [dict(zip(column_names, row)) for row in rows]
return list(result)
def get_parameter(self):
cur = self.cur
cur.execute("SELECT * FROM SQOOP.EXT_SNAPSHOT_PARAMETER WHERE
ETL_TYPE='FULL' AND IS_ACTV=1")
return self.sql_to_dict(cur)
def get_column_mapping(self, schema, table_name, transfer_type):
query = """
SELECT
RTRIM(
XMLAGG(XMLELEMENT(e, column_name ||
CASE data_type WHEN 'NUMBER'
THEN '=java.math.BigDecimal'
ELSE '=String'
END || ',') ORDER BY column_id).EXTRACT(
'//text()').getclobval(), ',') COLUMN_MAPPING
FROM ALL_TAB_COLS WHERE OWNER = '{}' AND TABLE_NAME = '{}'
""".format(schema, table_name)
cur = self.cur
cur.execute(query)
return (self.sql_to_dict(cur)[0]['COLUMN_MAPPING'])
sql_ops = SqlOps()
params = sql_ops.get_parameter()
schedule = '@daily'
schema = 'ODS'
dag_id = 'EOD_Snapshot'
args = {
'owner': 'EOD_Snapshot',
'start_date': dt.datetime(2020, 2, 13)
}
current_date = str(datetime.now().date().strftime('%Y_%m_%d'))
dag = DAG(dag_id, default_args=args, schedule_interval=schedule, catchup=False)
sqoop_job = """
sqoop import
-D mapred.child.java.opts=-Xmx4000M
-D mapred.map.tasks.speculative.execution=false
-D oraoop.jdbc.url.verbatim=true
--options-file {option_file}
--verbose
--as-parquetfile
--compress
--compression-codec snappy
--map-column-java {column_mapping}
--num-mappers {parallelism}
--query "SELECT /*FULL(t) NO_PARALLEL(t)*/ t.* FROM {schema}.{table} t WHERE
\$CONDITIONS"
--split-by "{splitby}"
--target-dir {targetdir}
--delete-target-dir
--hive-import
--hive-database {hive_database}
--hive-table {hive_table}
"""
with dag:
for t in params:
tmap = sql_ops.get_column_mapping(t['SRC_SCHEMA'],
t['SRC_TABLE_NAME'], t['TRANSFER_TYPE'])
cmd = sqoop_job.format(
option_file=t['OPTION_FILE'],
column_mapping=tmap,
parallelism=t['PARALLELISM'],
schema=t['SRC_SCHEMA'],
table=t['SRC_TABLE_NAME'],
splitby=t['SPLIT_BY'],
targetdir=t['TARGET_DIR'].format(current_date=current_date),
hive_database=t['TARGET_SCHEMA'],
hive_table=t['TARGET_TABLE'].format(current_date=current_date))
etl_start =
BashOperator(task_id='EXT_SNAPSHOT_{}_{}_SQOOP'.format(t['SRC_SCHEMA'],
t['SRC_TABLE_NAME']),
trigger_rule='all_done',
pool='ods_etl_eod_pool',
bash_command=cmd.replace('\n', ''),
dag=dag)
if t['RETENTION_PERIODICITY'] == 'day':
will_removed_table =
t['TARGET_TABLE'].format(current_date=(datetime.now() -
timedelta(days=t['RETENTION_PERIODICITY_TIME'])).strftime('%Y_%m_%d'))
print(will_removed_table + " will drop")
hadoop_snapshot_ops = HadoopSnapshotOps()
drop_retention_data =
PythonOperator(task_id='DROP_{}_{}'.format(t['TARGET_SCHEMA'],
will_removed_table),
trigger_rule='all_success',
python_callable=hadoop_snapshot_ops.drop_table,
pool='ods_etl_eod_pool',
op_args=[t['TARGET_SCHEMA'],will_removed_table],
dag=dag)
etl_start >> drop_retention_data