I am running Airflow 2.0.1 with the KubernetesExcecutor on OpenShift. Last
night I finally got a simple PythonOperator test DAG to run, but now the DAG
tasks mysteriously fail with no indication why. Here is what happens when I run
the task with DEBUG level logging:
----------
$ airflow tasks run test_python do-it 2021-02-09T04:14:25.310295+00:00 --local
--pool default_pool --subdir
/home/airflow/repo/hs-datalabs/Source/Python/datalabs/airflow/dag/test_python.py
[2021-02-09 04:28:12,070] {settings.py:210} DEBUG - Setting up DB connection
pool (PID 17)
[2021-02-09 04:28:12,071] {settings.py:281} DEBUG -
settings.prepare_engine_args(): Using pool settings. pool_size=5,
max_overflow=10, pool_recycle=1800, pid=17
[2021-02-09 04:28:12,334] {cli_action_loggers.py:40} DEBUG - Adding <function
default_action_log at 0x7f053b740c20> to pre execution callback
[2021-02-09 04:28:15,235] {cli_action_loggers.py:66} DEBUG - Calling callbacks:
[<function default_action_log at 0x7f053b740c20>]
[2021-02-09 04:28:15,321] {settings.py:210} DEBUG - Setting up DB connection
pool (PID 17)
[2021-02-09 04:28:15,322] {settings.py:243} DEBUG -
settings.prepare_engine_args(): Using NullPool
[2021-02-09 04:28:15,323] {dagbag.py:448} INFO - Filling up the DagBag from
/home/airflow/repo/hs-datalabs/Source/Python/datalabs/airflow/dag/test_python.py
[2021-02-09 04:28:15,426] {dagbag.py:287} DEBUG - Importing
/home/airflow/repo/hs-datalabs/Source/Python/datalabs/airflow/dag/test_python.py
[2021-02-09 04:28:15,466] {dagbag.py:413} DEBUG - Loaded DAG <DAG: test_python>
[2021-02-09 04:28:15,467] {dagbag.py:287} DEBUG - Importing
/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py
[2021-02-09 04:28:15,472] {dagbag.py:413} DEBUG - Loaded DAG <DAG:
example_bash_operator>
[2021-02-09 04:28:15,472] {dagbag.py:287} DEBUG - Importing
/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_branch_operator.py
[2021-02-09 04:28:15,477] {dagbag.py:413} DEBUG - Loaded DAG <DAG:
example_branch_operator>
. . .
[2021-02-09 04:28:15,764] {dagbag.py:287} DEBUG - Importing
/usr/local/lib/python3.7/site-packages/airflow/example_dags/tutorial_taskflow_api_etl.py
[2021-02-09 04:28:15,766] {dagbag.py:413} DEBUG - Loaded DAG <DAG:
tutorial_taskflow_api_etl>
[2021-02-09 04:28:15,766] {dagbag.py:287} DEBUG - Importing
/usr/local/lib/python3.7/site-packages/airflow/example_dags/subdags/subdag.py
[2021-02-09 04:28:15,801] {plugins_manager.py:270} DEBUG - Loading plugins
[2021-02-09 04:28:15,801] {plugins_manager.py:207} DEBUG - Loading plugins from
directory: /home/airflow/airflow/plugins
[2021-02-09 04:28:15,801] {plugins_manager.py:184} DEBUG - Loading plugins from
entrypoints
[2021-02-09 04:28:16,128] {plugins_manager.py:414} DEBUG - Integrate DAG plugins
Running <TaskInstance: test_python.do-it 2021-02-09T04:14:25.310295+00:00
[failed]> on host testpythondoit.3c93d77d6b8141348718e2c6467e55a9-debug
[2021-02-09 04:28:16,442] {cli_action_loggers.py:84} DEBUG - Calling callbacks:
[]
[2021-02-09 04:28:16,442] {settings.py:292} DEBUG - Disposing DB connection
pool (PID 17)
----------
Here is my DAG. Note that neither of the callbacks are running either:
----------
from pprint import pprint
from airflow import DAG
from kubernetes.client import models as k8s
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def print_that(datestamp, **kwargs):
pprint(kwargs)
print(datestamp)
return "Foobiddy Doobiddy"
def on_failure_callback(context):
dag_run = context.get('dag_run')
task_instances = dag_run.get_task_instances()
print(f'Failure: {task_instances}')
def on_success_callback(context):
dag_run = context.get('dag_run')
task_instances = dag_run.get_task_instances()
print(f'Success: {task_instances}')
with DAG(
dag_id='test_python',
default_args={'owner': 'airflow'},
schedule_interval=None,
start_date=days_ago(2),
tags=['testing'],
on_failure_callback=on_failure_callback,
on_success_callback=on_success_callback,
) as dag:
do_it = PythonOperator(
task_id="do-it",
python_callable=print_that,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
image="docker-registry.default.svc:5000/hsg-data-labs-dev/airflow-worker:1.0.1"
)
]
)
),
},
)
do_it_again = PythonOperator(
task_id="do-it-again",
python_callable=print_that,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
image="docker-registry.default.svc:5000/hsg-data-labs-dev/airflow-worker:1.0.1"
)
]
)
),
},
)
do_it >> do_it_again
----------
I would really appreciate some ideas on how I would debug or what might be
wrong.
Thanks,
Peter