You should try checking the airflow log files eg. tail -n 20 $HOME/airflow/airflow-*.log tail -n 20 $HOME/airflow/airflow-*.err
On Mon, Feb 8, 2021 at 6:51 PM Peter Lane <[email protected]> wrote: > I am running Airflow 2.0.1 with the KubernetesExcecutor on OpenShift. Last > night I finally got a simple PythonOperator test DAG to run, but now the > DAG tasks mysteriously fail with no indication why. Here is what happens > when I run the task with DEBUG level logging: > > ---------- > $ airflow tasks run test_python do-it 2021-02-09T04:14:25.310295+00:00 > --local --pool default_pool --subdir > /home/airflow/repo/hs-datalabs/Source/Python/datalabs/airflow/dag/test_python.py > > > [2021-02-09 04:28:12,070] {settings.py:210} DEBUG - Setting up DB > connection pool (PID 17) > > [2021-02-09 04:28:12,071] {settings.py:281} DEBUG - > settings.prepare_engine_args(): Using pool settings. pool_size=5, > max_overflow=10, pool_recycle=1800, pid=17 > > [2021-02-09 04:28:12,334] {cli_action_loggers.py:40} DEBUG - Adding > <function default_action_log at 0x7f053b740c20> to pre execution callback > > [2021-02-09 04:28:15,235] {cli_action_loggers.py:66} DEBUG - Calling > callbacks: [<function default_action_log at 0x7f053b740c20>] > > [2021-02-09 04:28:15,321] {settings.py:210} DEBUG - Setting up DB > connection pool (PID 17) > > [2021-02-09 04:28:15,322] {settings.py:243} DEBUG - > settings.prepare_engine_args(): Using NullPool > > [2021-02-09 04:28:15,323] {dagbag.py:448} INFO - Filling up the DagBag > from > /home/airflow/repo/hs-datalabs/Source/Python/datalabs/airflow/dag/test_python.py > > [2021-02-09 04:28:15,426] {dagbag.py:287} DEBUG - Importing > /home/airflow/repo/hs-datalabs/Source/Python/datalabs/airflow/dag/test_python.py > > [2021-02-09 04:28:15,466] {dagbag.py:413} DEBUG - Loaded DAG <DAG: > test_python> > > [2021-02-09 04:28:15,467] {dagbag.py:287} DEBUG - Importing > /usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py > > [2021-02-09 04:28:15,472] {dagbag.py:413} DEBUG - Loaded DAG <DAG: > example_bash_operator> > > [2021-02-09 04:28:15,472] {dagbag.py:287} DEBUG - Importing > /usr/local/lib/python3.7/site-packages/airflow/example_dags/example_branch_operator.py > > [2021-02-09 04:28:15,477] {dagbag.py:413} DEBUG - Loaded DAG <DAG: > example_branch_operator> > > . . . > > [2021-02-09 04:28:15,764] {dagbag.py:287} DEBUG - Importing > /usr/local/lib/python3.7/site-packages/airflow/example_dags/tutorial_taskflow_api_etl.py > > [2021-02-09 04:28:15,766] {dagbag.py:413} DEBUG - Loaded DAG <DAG: > tutorial_taskflow_api_etl> > > [2021-02-09 04:28:15,766] {dagbag.py:287} DEBUG - Importing > /usr/local/lib/python3.7/site-packages/airflow/example_dags/subdags/subdag.py > > [2021-02-09 04:28:15,801] {plugins_manager.py:270} DEBUG - Loading plugins > > [2021-02-09 04:28:15,801] {plugins_manager.py:207} DEBUG - Loading plugins > from directory: /home/airflow/airflow/plugins > > [2021-02-09 04:28:15,801] {plugins_manager.py:184} DEBUG - Loading plugins > from entrypoints > > [2021-02-09 04:28:16,128] {plugins_manager.py:414} DEBUG - Integrate DAG > plugins > > Running <TaskInstance: test_python.do-it 2021-02-09T04:14:25.310295+00:00 > [failed]> on host testpythondoit.3c93d77d6b8141348718e2c6467e55a9-debug > > [2021-02-09 04:28:16,442] {cli_action_loggers.py:84} DEBUG - Calling > callbacks: [] > > [2021-02-09 04:28:16,442] {settings.py:292} DEBUG - Disposing DB > connection pool (PID 17) > > ---------- > > > > Here is my DAG. Note that neither of the callbacks are running either: > > > > ---------- > > from pprint import pprint > > > > from airflow import DAG > > from kubernetes.client import models as k8s > > from airflow.operators.python import PythonOperator > > from airflow.utils.dates import days_ago > > > > > > def print_that(datestamp, **kwargs): > > pprint(kwargs) > > print(datestamp) > > > > return "Foobiddy Doobiddy" > > > > > > def on_failure_callback(context): > > dag_run = context.get('dag_run') > > task_instances = dag_run.get_task_instances() > > print(f'Failure: {task_instances}') > > > > > > def on_success_callback(context): > > dag_run = context.get('dag_run') > > task_instances = dag_run.get_task_instances() > > print(f'Success: {task_instances}') > > > > > > with DAG( > > dag_id='test_python', > > default_args={'owner': 'airflow'}, > > schedule_interval=None, > > start_date=days_ago(2), > > tags=['testing'], > > on_failure_callback=on_failure_callback, > > on_success_callback=on_success_callback, > > ) as dag: > > do_it = PythonOperator( > > task_id="do-it", > > python_callable=print_that, > > executor_config={ > > "pod_override": k8s.V1Pod( > > spec=k8s.V1PodSpec( > > containers=[ > > k8s.V1Container( > > name="base", > > > image="docker-registry.default.svc:5000/hsg-data-labs-dev/airflow-worker:1.0.1" > > ) > > ] > > ) > > ), > > }, > > ) > > > > > > do_it_again = PythonOperator( > > task_id="do-it-again", > > python_callable=print_that, > > executor_config={ > > "pod_override": k8s.V1Pod( > > spec=k8s.V1PodSpec( > > containers=[ > > k8s.V1Container( > > name="base", > > > image="docker-registry.default.svc:5000/hsg-data-labs-dev/airflow-worker:1.0.1" > > ) > > ] > > ) > > ), > > }, > > ) > > > > > > do_it >> do_it_again > > ---------- > > > > I would really appreciate some ideas on how I would debug or what might be > wrong. > > Thanks, > Peter > -- This electronic message is intended only for the named recipient, and may contain information that is confidential or privileged. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution or use of the contents of this message is strictly prohibited. If you have received this message in error or are not the named recipient, please notify us immediately by contacting the sender at the electronic mail address noted above, and delete and destroy all copies of this message. Thank you.
