Hello community, here is the log from the commit of package python-celery-batches for openSUSE:Factory checked in at 2020-03-30 23:06:59 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-celery-batches (Old) and /work/SRC/openSUSE:Factory/.python-celery-batches.new.3160 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-celery-batches" Mon Mar 30 23:06:59 2020 rev:3 rq:789802 version:0.3 Changes: -------- --- /work/SRC/openSUSE:Factory/python-celery-batches/python-celery-batches.changes 2019-12-05 17:37:30.805392190 +0100 +++ /work/SRC/openSUSE:Factory/.python-celery-batches.new.3160/python-celery-batches.changes 2020-03-30 23:07:01.764300455 +0200 @@ -1,0 +2,11 @@ +Mon Mar 30 13:47:17 UTC 2020 - Marketa Calabkova <mcalabk...@suse.com> + +- Update to version 0.3 + * Properly set the ``current_task`` when running Batch tasks. + * Call the success signal after a successful run of the Batch task. + * Support running tasks eagerly via the ``Task.apply()`` method. This causes + the task to execute with a batch of a single item. + * Officially support Python 3.7 and 3.8. Drop support for Python 3.4. + * Officially support Celery 4.3 and 4.4. + +------------------------------------------------------------------- Old: ---- celery-batches-0.2.tar.gz New: ---- celery-batches-0.3.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-celery-batches.spec ++++++ --- /var/tmp/diff_new_pack.WagQDZ/_old 2020-03-30 23:07:03.484301435 +0200 +++ /var/tmp/diff_new_pack.WagQDZ/_new 2020-03-30 23:07:03.512301450 +0200 @@ -1,7 +1,7 @@ # # spec file for package python-celery-batches # -# Copyright (c) 2019 SUSE LINUX GmbH, Nuernberg, Germany. +# Copyright (c) 2020 SUSE LLC # Copyright (c) 2018 Matthias Fehring <buschman...@opensuse.org> # # All modifications and additions to the file contributed by third parties @@ -20,7 +20,7 @@ %{?!python_module:%define python_module() python-%{**} python3-%{**}} %define _pkgname celery-batches Name: python-%{_pkgname} -Version: 0.2 +Version: 0.3 Release: 0 Summary: Django module to process multiple Celery task requests together License: BSD-3-Clause ++++++ celery-batches-0.2.tar.gz -> celery-batches-0.3.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/.gitignore new/celery-batches-0.3/.gitignore --- old/celery-batches-0.2/.gitignore 2018-04-20 18:59:14.000000000 +0200 +++ new/celery-batches-0.3/.gitignore 2020-01-30 00:19:01.000000000 +0100 @@ -14,3 +14,6 @@ # Coverage related. .coverage htmlcov + +# Editor related. +.idea diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/.travis.yml new/celery-batches-0.3/.travis.yml --- old/celery-batches-0.2/.travis.yml 2018-04-20 18:59:14.000000000 +0200 +++ new/celery-batches-0.3/.travis.yml 2020-01-30 00:19:01.000000000 +0100 @@ -1,35 +1,64 @@ language: python -sudo: required -dist: trusty +dist: bionic cache: pip python: - '2.7' - - '3.4' - '3.5' - '3.6' + - '3.7' + - '3.8' - 'pypy' - 'pypy3' os: - linux +stages: + - lint + - test env: - CELERY_VERSION=40 - CELERY_VERSION=41 + - CELERY_VERSION=42 + - CELERY_VERSION=43 + - CELERY_VERSION=44 - CELERY_VERSION=master +matrix: + include: + - python: '3.6' + env: TOXENV=flake8 + stage: lint + # Celery 4.3 added support for Python >= 3.7. + exclude: + - python: '3.7' + env: CELERY_VERSION=40 + - python: '3.7' + env: CELERY_VERSION=41 + - python: '3.7' + env: CELERY_VERSION=42 + - python: '3.8' + env: CELERY_VERSION=40 + - python: '3.8' + env: CELERY_VERSION=41 + - python: '3.8' + env: CELERY_VERSION=42 + allow_failures: + - env: CELERY_VERSION=master + - python: pypy before_install: - - export TOXENV=${TRAVIS_PYTHON_VERSION}-celery${CELERY_VERSION} - - | - if [[ "$TOXENV" =~ "pypy" ]]; then - export PYENV_ROOT="$HOME/.pyenv" - if [ -f "$PYENV_ROOT/bin/pyenv" ]; then - cd "$PYENV_ROOT" && git pull - else - rm -rf "$PYENV_ROOT" && git clone --depth 1 https://github.com/pyenv/pyenv.git "$PYENV_ROOT" - fi - "$PYENV_ROOT/bin/pyenv" install "$PYPY_VERSION" - virtualenv --python="$PYENV_ROOT/versions/$PYPY_VERSION/bin/python" "$HOME/virtualenvs/$PYPY_VERSION" - source "$HOME/virtualenvs/$PYPY_VERSION/bin/activate" - which python + # If TOXENV is not set, build it from the Python and Celery versions. + - if [[ -v CELERY_VERSION ]]; then export TOXENV=${TRAVIS_PYTHON_VERSION}-celery${CELERY_VERSION}; fi; env + - | + if [[ "$TOXENV" =~ "pypy" ]]; then + export PYENV_ROOT="$HOME/.pyenv" + if [ -f "$PYENV_ROOT/bin/pyenv" ]; then + cd "$PYENV_ROOT" && git pull + else + rm -rf "$PYENV_ROOT" && git clone --depth 1 https://github.com/pyenv/pyenv.git "$PYENV_ROOT" fi + "$PYENV_ROOT/bin/pyenv" install "$PYPY_VERSION" + virtualenv --python="$PYENV_ROOT/versions/$PYPY_VERSION/bin/python" "$HOME/virtualenvs/$PYPY_VERSION" + source "$HOME/virtualenvs/$PYPY_VERSION/bin/activate" + which python + fi after_success: - | if [[ -v MATRIX_TOXENV || "$TOXENV" =~ "pypy" ]]; then diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/CHANGELOG.rst new/celery-batches-0.3/CHANGELOG.rst --- old/celery-batches-0.2/CHANGELOG.rst 2018-04-20 18:59:14.000000000 +0200 +++ new/celery-batches-0.3/CHANGELOG.rst 2020-01-30 00:19:01.000000000 +0100 @@ -3,6 +3,16 @@ Changelog ######### +0.3 2020-01-29 +============== + +* Properly set the ``current_task`` when running Batch tasks. +* Call the success signal after a successful run of the Batch task. +* Support running tasks eagerly via the ``Task.apply()`` method. This causes + the task to execute with a batch of a single item. +* Officially support Python 3.7 and 3.8. Drop support for Python 3.4. +* Officially support Celery 4.3 and 4.4. + 0.2 2018-04-20 ============== diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/README.rst new/celery-batches-0.3/README.rst --- old/celery-batches-0.2/README.rst 2018-04-20 18:59:14.000000000 +0200 +++ new/celery-batches-0.3/README.rst 2020-01-30 00:19:01.000000000 +0100 @@ -8,7 +8,10 @@ History ======= -Celery Batches was part of Celery (as ``celery.contrib.batches``) until Celery -4.0. This is repository includes that history. The Batches code has been updated -to maintain compatible with newer versions of Celery and other fixes. See the -Changelog for details. +Celery Batches was distributed as part of Celery (as ``celery.contrib.batches``) +until Celery 4.0. This project updates the Batches code to maintain compatiblity +with newer versions of Celery and other fixes. See the Changelog for details. + +Additionally, this repository includes the full history of the code from +``celery.contrib.batches``, but rewritten to the ``celery_batches/__init__.py`` +file. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/celery_batches/__init__.py new/celery-batches-0.3/celery_batches/__init__.py --- old/celery-batches-0.2/celery_batches/__init__.py 2018-04-20 18:59:14.000000000 +0200 +++ new/celery-batches-0.3/celery_batches/__init__.py 2020-01-30 00:19:01.000000000 +0100 @@ -3,13 +3,24 @@ celery_batches ============== -Experimental task class that buffers messages and processes them as a list. +Experimental task class that buffers messages and processes them as a list. Task +requests are buffered in memory (on a worker) until either the flush count or +flush interval is reached. Once the requests are flushed, they are sent to the +task as a list of :class:`~celery_batches.SimpleRequest` instances. + +It is possible to return a result for each task request by calling +``mark_as_done`` on your results backend. Returning a value from the Batch task +call is only used to provide values to signals and does not populate into the +results backend. .. warning:: For this to work you have to set :setting:`worker_prefetch_multiplier` to zero, or some value where - the final multiplied value is higher than ``flush_every``. + the final multiplied value is higher than ``flush_every``. Note that Celery + will attempt to continually pull data into memory if this is set to zero. + This can cause excessive resource consumption on both Celery workers and the + broker when used with a deep queue. In the future we hope to add the ability to direct batching tasks to a channel with different QoS requirements than the task channel. @@ -45,7 +56,7 @@ import requests from urlparse import urlparse - from celery.contrib.batches import Batches + from celery_batches import Batches wot_api_target = 'https://api.mywot.com/0.4/public_link_json' @@ -86,8 +97,9 @@ from itertools import count from celery import signals, states +from celery._state import _task_stack +from celery.app.task import Context, Task from celery.five import Empty, Queue -from celery.task import Task from celery.utils import noop from celery.utils.log import get_logger from celery.worker.request import Request @@ -127,36 +139,57 @@ send_prerun = signals.task_prerun.send send_postrun = signals.task_postrun.send +send_success = signals.task_success.send SUCCESS = states.SUCCESS FAILURE = states.FAILURE def apply_batches_task(task, args, loglevel, logfile): # Mimics some of the functionality found in celery.app.trace.trace_task. + request_stack = task.request_stack + push_request = request_stack.push + pop_request = request_stack.pop + push_task = _task_stack.push + pop_task = _task_stack.pop + prerun_receivers = signals.task_prerun.receivers postrun_receivers = signals.task_postrun.receivers + success_receivers = signals.task_success.receivers # Corresponds to multiple requests, so generate a new UUID. task_id = uuid() - if prerun_receivers: - send_prerun(sender=task, task_id=task_id, task=task, - args=args, kwargs={}) + push_task(task) + task_request = Context(loglevel=loglevel, logfile=logfile) + push_request(task_request) - task.push_request(loglevel=loglevel, logfile=logfile) try: - result = task(*args) - state = SUCCESS - except Exception as exc: - result = None - state = FAILURE - logger.error('Error: %r', exc, exc_info=True) + # -*- PRE -*- + if prerun_receivers: + send_prerun(sender=task, task_id=task_id, task=task, + args=args, kwargs={}) + + # -*- TRACE -*- + try: + result = task(*args) + state = SUCCESS + except Exception as exc: + result = None + state = FAILURE + logger.error('Error: %r', exc, exc_info=True) + else: + if success_receivers: + send_success(sender=task, result=result) finally: - task.pop_request() - if postrun_receivers: - send_postrun(sender=task, task_id=task_id, task=task, - args=args, kwargs={}, - retval=result, state=state) + try: + if postrun_receivers: + send_postrun(sender=task, task_id=task_id, task=task, + args=args, kwargs={}, + retval=result, state=state) + finally: + pop_task() + pop_request() + return result @@ -255,6 +288,26 @@ return task_message_handler + def apply(self, args=None, kwargs=None, *_args, **_kwargs): + """ + Execute this task locally as a batch of size 1, by blocking until the task returns. + + Arguments: + args (Tuple): positional arguments passed on to the task. + Returns: + celery.result.EagerResult: pre-evaluated result. + """ + request = SimpleRequest( + id=_kwargs.get("task_id", uuid()), + name="batch request", + args=args or (), + kwargs=kwargs or {}, + delivery_info=None, + hostname="localhost", + ) + + return super(Batches, self).apply(([request],), {}, *_args, **_kwargs) + def flush(self, requests): return self.apply_buffer(requests, ([SimpleRequest.from_request(r) for r in requests],)) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/requirements/pkgutils.txt new/celery-batches-0.3/requirements/pkgutils.txt --- old/celery-batches-0.2/requirements/pkgutils.txt 1970-01-01 01:00:00.000000000 +0100 +++ new/celery-batches-0.3/requirements/pkgutils.txt 2020-01-30 00:19:01.000000000 +0100 @@ -0,0 +1 @@ +flake8>3.5.0 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/requirements/test.txt new/celery-batches-0.3/requirements/test.txt --- old/celery-batches-0.2/requirements/test.txt 2018-04-20 18:59:14.000000000 +0200 +++ new/celery-batches-0.3/requirements/test.txt 2020-01-30 00:19:01.000000000 +0100 @@ -1,2 +1,2 @@ -pytest>=3.0,<3.3 +pytest>=3.8.0,<3.9 coverage diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/setup.cfg new/celery-batches-0.3/setup.cfg --- old/celery-batches-0.2/setup.cfg 2018-04-20 18:59:14.000000000 +0200 +++ new/celery-batches-0.3/setup.cfg 2020-01-30 00:19:01.000000000 +0100 @@ -1,3 +1,7 @@ +[flake8] +# Don't use a strict line limit if it makes the code more readable. +ignore = E501 + [bdist_wheel] universal = 1 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/setup.py new/celery-batches-0.3/setup.py --- old/celery-batches-0.2/setup.py 2018-04-20 18:59:14.000000000 +0200 +++ new/celery-batches-0.3/setup.py 2020-01-30 00:19:01.000000000 +0100 @@ -14,7 +14,7 @@ setuptools.setup( name='celery-batches', packages=setuptools.find_packages(), - version='0.2', + version='0.3', description='Experimental task class that buffers messages and processes them as a list.', long_description=long_description(), keywords='task job queue distributed messaging actor', diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/t/integration/conftest.py new/celery-batches-0.3/t/integration/conftest.py --- old/celery-batches-0.2/t/integration/conftest.py 2018-04-20 18:59:14.000000000 +0200 +++ new/celery-batches-0.3/t/integration/conftest.py 2020-01-30 00:19:01.000000000 +0100 @@ -1,5 +1,6 @@ import pytest + @pytest.fixture(scope='session', params=[1, 2]) def celery_config(request): return { diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/t/integration/tasks.py new/celery-batches-0.3/t/integration/tasks.py --- old/celery-batches-0.2/t/integration/tasks.py 2018-04-20 18:59:14.000000000 +0200 +++ new/celery-batches-0.3/t/integration/tasks.py 2020-01-30 00:19:01.000000000 +0100 @@ -1,8 +1,7 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, unicode_literals -from celery import chain, group, shared_task -from celery.exceptions import SoftTimeLimitExceeded +from celery import shared_task from celery.utils.log import get_task_logger from celery_batches import Batches @@ -12,6 +11,7 @@ class Singleton(type): _instances = {} + def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) @@ -38,6 +38,7 @@ result += request.args[0] Results().set(result) + return result @shared_task(base=Batches, flush_every=2, flush_interval=1) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/t/integration/test_batches.py new/celery-batches-0.3/t/integration/test_batches.py --- old/celery-batches-0.2/t/integration/test_batches.py 2018-04-20 18:59:14.000000000 +0200 +++ new/celery-batches-0.3/t/integration/test_batches.py 2020-01-30 00:19:01.000000000 +0100 @@ -5,27 +5,34 @@ from celery import signals from celery.app.task import Task -from celery.result import _set_task_join_will_block, allow_join_result +from celery.result import allow_join_result from celery.contrib.testing.tasks import ping from .tasks import add, cumadd, Results class SignalCounter(object): - def __init__(self, expected_calls): + def __init__(self, expected_calls, callback=None): self.calls = 0 self.expected_calls = expected_calls + self.callback = callback def __call__(self, sender, **kwargs): if isinstance(sender, Task): - sender = sender.name + sender_name = sender.name + else: + sender_name = sender # Ignore pings, those are used to ensure the worker processes tasks. - if sender == 'celery.ping': + if sender_name == 'celery.ping': return self.calls += 1 + # Call the "real" signal, if necessary. + if self.callback: + self.callback(sender, **kwargs) + def assert_calls(self): assert self.calls == self.expected_calls @@ -40,8 +47,32 @@ assert ping.delay().get(timeout=ping_task_timeout) == 'pong' +def test_always_eager(): + """The batch task runs immediately, in the same thread.""" + app = add._get_app() + task_always_eager = app.conf.task_always_eager + app.conf["task_always_eager"] = True + + result = add.delay(1) + + app.conf["task_always_eager"] = task_always_eager + + # An EagerResult that resolve to 1 should be returned. + assert result.get() == 1 + assert Results().get() == 1 + + +def test_apply(): + """The batch task runs immediately, in the same thread.""" + result = add.apply(args=(1, )) + + # An EagerResult that resolve to 1 should be returned. + assert result.get() == 1 + assert Results().get() == 1 + + def test_flush_interval(celery_worker): - """The batch runs after the flush interval has elapsed.""" + """The batch task runs after the flush interval has elapsed.""" add.delay(1) # The flush interval is 1 second, this is longer. @@ -54,7 +85,7 @@ def test_flush_calls(celery_worker): - """The batch runs after two calls.""" + """The batch task runs after two calls.""" add.delay(1) add.delay(3) @@ -65,6 +96,7 @@ def test_result(celery_worker): + """Each task call can return a result.""" result_1 = cumadd.delay(1) result_2 = cumadd.delay(2) @@ -76,7 +108,7 @@ def test_signals(celery_app, celery_worker): - """The batch runs after two calls.""" + """Ensure that Celery signals run for the batch task.""" # Configure a SignalCounter for each task signal. checks = ( # Each task request gets published separately. @@ -87,7 +119,7 @@ (signals.task_postrun, 1), # Other task signals are not implemented. (signals.task_retry, 0), - (signals.task_success, 0), + (signals.task_success, 1), (signals.task_failure, 0), (signals.task_revoked, 0), (signals.task_unknown, 0), @@ -99,6 +131,7 @@ sig.connect(counter) signal_counters.append(counter) + # The batch runs after 2 task calls. add.delay(1) add.delay(3) @@ -112,5 +145,24 @@ counter.assert_calls() +def test_current_task(celery_app, celery_worker): + """Ensure the current_task is properly set when running the task.""" + def signal(sender, **kwargs): + assert celery_app.current_task.name == 't.integration.tasks.add' + + counter = SignalCounter(1, signal) + signals.task_prerun.connect(counter) + + # The batch runs after 2 task calls. + add.delay(1) + add.delay(3) + + # Let the worker work. + _wait_for_ping() + + # Should still have the correct result. + assert Results().get() == 4 + counter.assert_calls() + # TODO # * Test acking diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/celery-batches-0.2/tox.ini new/celery-batches-0.3/tox.ini --- old/celery-batches-0.2/tox.ini 2018-04-20 18:59:14.000000000 +0200 +++ new/celery-batches-0.3/tox.ini 2020-01-30 00:19:01.000000000 +0100 @@ -1,13 +1,24 @@ [tox] envlist = - {2.7,pypy,3.4,3.5,3.6,pypy3}-celery{40,41,master} + {2.7,pypy,3.4,3.5,3.6,pypy3}-celery{40,41,42} + # Celery 4.3 adds support for Python 3.7 and higher. + {3.7,3.8}-celery{43,44,master} + flake8 [testenv] deps= -r{toxinidir}/requirements/test.txt celery40: celery>=4.0,<4.1 + # Kombu 4.2.0 is incompatible with Celery 4.0.x. It is compatible with + # Celery 4.1.1 and 4.2.x. + celery40: kombu<4.2 celery41: celery>=4.1,<4.2 + celery42: celery>=4.2,<4.3 + celery43: celery>=4.3,<4.4 + celery44: celery>=4.4,<4.5 celerymaster: https://codeload.github.com/celery/celery/zip/master + + flake8: -r{toxinidir}/requirements/pkgutils.txt sitepackages = False recreate = False commands = @@ -15,9 +26,15 @@ coverage html basepython = 2.7: python2.7 - 3.4: python3.4 3.5: python3.5 3.6: python3.6 + 3.7: python3.7 + 3.8: python3.8 pypy: pypy pypy3: pypy3 + flake8: python3.6 usedevelop = True + +[testenv:flake8] +commands = + flake8 -j 2 {toxinidir}/celery_batches {toxinidir}/t