ashb closed pull request #4207: [AIRFLOW-3367] Run celery integration test with redis broker. URL: https://github.com/apache/incubator-airflow/pull/4207
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 954e17ca03..e85979dace 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -16,20 +16,22 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import os import sys import unittest +import contextlib from multiprocessing import Pool import mock -from celery.contrib.testing.worker import start_worker -from airflow.executors import celery_executor -from airflow.executors.celery_executor import CELERY_FETCH_ERR_MSG_HEADER -from airflow.executors.celery_executor import (CeleryExecutor, celery_configuration, - send_task_to_executor, execute_command) -from airflow.executors.celery_executor import app +from celery import Celery from celery import states as celery_states +from celery.contrib.testing.worker import start_worker +from kombu.asynchronous import set_event_loop +from parameterized import parameterized + from airflow.utils.state import State +from airflow.executors import celery_executor from airflow import configuration configuration.load_test_config() @@ -38,48 +40,80 @@ import celery.contrib.testing.tasks # noqa: F401 +def _prepare_test_bodies(): + if 'CELERY_BROKER_URLS' in os.environ: + return [ + (url, ) + for url in os.environ['CELERY_BROKER_URLS'].split(',') + ] + return [(configuration.conf.get('celery', 'BROKER_URL'))] + + class CeleryExecutorTest(unittest.TestCase): + + @contextlib.contextmanager + def _prepare_app(self, broker_url=None, execute=None): + broker_url = broker_url or configuration.conf.get('celery', 'BROKER_URL') + execute = execute or celery_executor.execute_command.__wrapped__ + + test_config = dict(celery_executor.celery_configuration) + test_config.update({'broker_url': broker_url}) + test_app = Celery(broker_url, config_source=test_config) + test_execute = test_app.task(execute) + patch_app = mock.patch('airflow.executors.celery_executor.app', test_app) + patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute) + + with patch_app, patch_execute: + try: + yield test_app + finally: + # Clear event loop to tear down each celery instance + set_event_loop(None) + + @parameterized.expand(_prepare_test_bodies()) @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'), "sqlite is configured with SequentialExecutor") - def test_celery_integration(self): - executor = CeleryExecutor() - executor.start() - with start_worker(app=app, logfile=sys.stdout, loglevel='debug'): - success_command = ['true', 'some_parameter'] - fail_command = ['false', 'some_parameter'] - - cached_celery_backend = execute_command.backend - task_tuples_to_send = [('success', 'fake_simple_ti', success_command, - celery_configuration['task_default_queue'], - execute_command), - ('fail', 'fake_simple_ti', fail_command, - celery_configuration['task_default_queue'], - execute_command)] - - chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send)) - num_processes = min(len(task_tuples_to_send), executor._sync_parallelism) - - send_pool = Pool(processes=num_processes) - key_and_async_results = send_pool.map( - send_task_to_executor, - task_tuples_to_send, - chunksize=chunksize) - - send_pool.close() - send_pool.join() - - for key, command, result in key_and_async_results: - # Only pops when enqueued successfully, otherwise keep it - # and expect scheduler loop to deal with it. - result.backend = cached_celery_backend - executor.running[key] = command - executor.tasks[key] = result - executor.last_state[key] = celery_states.PENDING - - executor.running['success'] = True - executor.running['fail'] = True - - executor.end(synchronous=True) + def test_celery_integration(self, broker_url): + with self._prepare_app(broker_url) as app: + executor = celery_executor.CeleryExecutor() + executor.start() + + with start_worker(app=app, logfile=sys.stdout, loglevel='debug'): + success_command = ['true', 'some_parameter'] + fail_command = ['false', 'some_parameter'] + + cached_celery_backend = celery_executor.execute_command.backend + task_tuples_to_send = [('success', 'fake_simple_ti', success_command, + celery_executor.celery_configuration['task_default_queue'], + celery_executor.execute_command), + ('fail', 'fake_simple_ti', fail_command, + celery_executor.celery_configuration['task_default_queue'], + celery_executor.execute_command)] + + chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send)) + num_processes = min(len(task_tuples_to_send), executor._sync_parallelism) + + send_pool = Pool(processes=num_processes) + key_and_async_results = send_pool.map( + celery_executor.send_task_to_executor, + task_tuples_to_send, + chunksize=chunksize) + + send_pool.close() + send_pool.join() + + for key, command, result in key_and_async_results: + # Only pops when enqueued successfully, otherwise keep it + # and expect scheduler loop to deal with it. + result.backend = cached_celery_backend + executor.running[key] = command + executor.tasks[key] = result + executor.last_state[key] = celery_states.PENDING + + executor.running['success'] = True + executor.running['fail'] = True + + executor.end(synchronous=True) self.assertTrue(executor.event_buffer['success'], State.SUCCESS) self.assertTrue(executor.event_buffer['fail'], State.FAILED) @@ -93,38 +127,39 @@ def test_celery_integration(self): @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'), "sqlite is configured with SequentialExecutor") def test_error_sending_task(self): - @app.task def fake_execute_command(): pass - # fake_execute_command takes no arguments while execute_command takes 1, - # which will cause TypeError when calling task.apply_async() - celery_executor.execute_command = fake_execute_command - executor = CeleryExecutor() - value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti' - executor.queued_tasks['key'] = value_tuple - executor.heartbeat() + with self._prepare_app(execute=fake_execute_command): + # fake_execute_command takes no arguments while execute_command takes 1, + # which will cause TypeError when calling task.apply_async() + executor = celery_executor.CeleryExecutor() + value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti' + executor.queued_tasks['key'] = value_tuple + executor.heartbeat() self.assertEquals(1, len(executor.queued_tasks)) self.assertEquals(executor.queued_tasks['key'], value_tuple) def test_exception_propagation(self): - @app.task - def fake_celery_task(): - return {} + with self._prepare_app() as app: + @app.task + def fake_celery_task(): + return {} + + mock_log = mock.MagicMock() + executor = celery_executor.CeleryExecutor() + executor._log = mock_log - mock_log = mock.MagicMock() - executor = CeleryExecutor() - executor._log = mock_log + executor.tasks = {'key': fake_celery_task()} + executor.sync() - executor.tasks = {'key': fake_celery_task()} - executor.sync() mock_log.error.assert_called_once() args, kwargs = mock_log.error.call_args_list[0] log = args[0] # Result of queuing is not a celery task but a dict, # and it should raise AttributeError and then get propagated # to the error log. - self.assertIn(CELERY_FETCH_ERR_MSG_HEADER, log) + self.assertIn(celery_executor.CELERY_FETCH_ERR_MSG_HEADER, log) self.assertIn('AttributeError', log) diff --git a/tox.ini b/tox.ini index 513bd83b1c..6065f9d072 100644 --- a/tox.ini +++ b/tox.ini @@ -43,6 +43,7 @@ setenv = MINICLUSTER_HOME=/tmp/minicluster-1.1-SNAPSHOT KRB5_CONFIG=/etc/krb5.conf KRB5_KTNAME=/etc/airflow.keytab + CELERY_BROKER_URLS=amqp://guest:guest@rabbitmq:5672,redis://redis:6379/0 backend_mysql: AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql://root@mysql/airflow backend_mysql: AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://root@mysql/airflow backend_postgres: AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://postgres:airflow@postgres/airflow ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services