ephraimbuddy opened a new issue #17444: URL: https://github.com/apache/airflow/issues/17444
This test is flaky and fails sometimes ``` ____________ TestLocalTaskJob.test_task_sigkill_works_with_retries _____________ self = <tests.jobs.test_local_task_job.TestLocalTaskJob object at 0x7f3a8ec6c6a0> dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at 0x7f3aa1490080> def test_task_sigkill_works_with_retries(self, dag_maker): """ Test that ensures that tasks are retried when they receive sigkill """ # use shared memory value so we can properly track value change even if # it's been updated across processes. retry_callback_called = Value('i', 0) task_terminated_externally = Value('i', 1) shared_mem_lock = Lock() def retry_callback(context): with shared_mem_lock: retry_callback_called.value += 1 assert context['dag_run'].dag_id == 'test_mark_failure_2' def task_function(ti): os.kill(os.getpid(), signal.SIGKILL) # This should not happen -- the state change should be noticed and the task should get killed with shared_mem_lock: task_terminated_externally.value = 0 with dag_maker( dag_id='test_mark_failure_2', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'} ): task = PythonOperator( task_id='test_on_failure', python_callable=task_function, retries=1, retry_delay=timedelta(seconds=2), on_retry_callback=retry_callback, ) ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) ti.refresh_from_db() job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) job1.task_runner = StandardTaskRunner(job1) job1.task_runner.start() settings.engine.dispose() process = multiprocessing.Process(target=job1.run) process.start() time.sleep(0.4) process.join(timeout=10) ti.refresh_from_db() > assert ti.state == State.UP_FOR_RETRY E AssertionError: assert None == <TaskInstanceState.UP_FOR_RETRY: 'up_for_retry'> E + where None = <TaskInstance: test_mark_failure_2.test_on_failure 2016-01-01 00:00:00+00:00 [None]>.state E + and <TaskInstanceState.UP_FOR_RETRY: 'up_for_retry'> = State.UP_FOR_RETRY tests/jobs/test_local_task_job.py:778: AssertionError ----------------------------- Captured stdout call ----------------------------- Running <TaskInstance: test_mark_failure_2.test_on_failure 2016-01-01T00:00:00+00:00 [None]> on host 9a517fc8fb98 ----------------------------- Captured stderr call ----------------------------- Process Process-116: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context cursor, statement, parameters, context File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute cursor.execute(statement, parameters) File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 206, in execute res = self._query(query) File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 319, in _query db.query(q) File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 259, in query _mysql.connection.query(self, query) MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry 'test_on_failure-test_mark_failure_2-2016-01-01 00:00:00.000000' for key 'PRIMARY'") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org