Script 'mail_helper' called by obssrc
Hello community,
here is the log from the commit of package python-azure-servicebus for
openSUSE:Factory checked in at 2023-10-15 19:28:14
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-azure-servicebus (Old)
and /work/SRC/openSUSE:Factory/.python-azure-servicebus.new.20540 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-azure-servicebus"
Sun Oct 15 19:28:14 2023 rev:30 rq:1117875 version:7.11.3
Changes:
--------
---
/work/SRC/openSUSE:Factory/python-azure-servicebus/python-azure-servicebus.changes
2023-09-15 22:11:34.734229431 +0200
+++
/work/SRC/openSUSE:Factory/.python-azure-servicebus.new.20540/python-azure-servicebus.changes
2023-10-15 19:30:19.154903701 +0200
@@ -1,0 +2,8 @@
+Thu Oct 12 10:07:01 UTC 2023 - John Paul Adrian Glaubitz
<[email protected]>
+
+- New upstream release
+ + Version 7.11.3
+ + For detailed information about changes see the
+ CHANGELOG.md file provided with this package
+
+-------------------------------------------------------------------
Old:
----
azure-servicebus-7.11.2.tar.gz
New:
----
azure-servicebus-7.11.3.tar.gz
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Other differences:
------------------
++++++ python-azure-servicebus.spec ++++++
--- /var/tmp/diff_new_pack.ObhDOQ/_old 2023-10-15 19:30:19.714923875 +0200
+++ /var/tmp/diff_new_pack.ObhDOQ/_new 2023-10-15 19:30:19.718924019 +0200
@@ -21,7 +21,7 @@
%define skip_python2 1
%endif
Name: python-azure-servicebus
-Version: 7.11.2
+Version: 7.11.3
Release: 0
Summary: Microsoft Azure Service Bus Runtime Client Library
License: Apache-2.0
++++++ azure-servicebus-7.11.2.tar.gz -> azure-servicebus-7.11.3.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/azure-servicebus-7.11.2/CHANGELOG.md
new/azure-servicebus-7.11.3/CHANGELOG.md
--- old/azure-servicebus-7.11.2/CHANGELOG.md 2023-09-13 21:58:31.000000000
+0200
+++ new/azure-servicebus-7.11.3/CHANGELOG.md 2023-10-11 22:08:58.000000000
+0200
@@ -1,5 +1,11 @@
# Release History
+## 7.11.3 (2023-10-11)
+
+### Bugs Fixed
+
+- Fixed a bug where `prefetch_count` was not being passed through correctly
and caused messages to not be received as expected when in `RECEIVE_AND_DELETE`
mode ([#31712](https://github.com/Azure/azure-sdk-for-python/issues/31712),
[#31711](https://github.com/Azure/azure-sdk-for-python/issues/31711)).
+
## 7.11.2 (2023-09-13)
### Bugs Fixed
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/azure-servicebus-7.11.2/PKG-INFO
new/azure-servicebus-7.11.3/PKG-INFO
--- old/azure-servicebus-7.11.2/PKG-INFO 2023-09-13 21:59:31.079491600
+0200
+++ new/azure-servicebus-7.11.3/PKG-INFO 2023-10-11 22:10:08.476259200
+0200
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: azure-servicebus
-Version: 7.11.2
+Version: 7.11.3
Summary: Microsoft Azure Service Bus Client Library for Python
Home-page: https://github.com/Azure/azure-sdk-for-python
Author: Microsoft Corporation
@@ -47,10 +47,6 @@
**NOTE**: If you are using version 0.50 or lower and want to migrate to the
latest version
of this package please look at our [migration guide to move from Service Bus
V0.50 to Service Bus V7][migration_guide].
-## _Disclaimer_
-
-_Azure SDK Python packages support for Python 2.7 ended 01 January 2022. For
more information and questions, please refer to
https://github.com/Azure/azure-sdk-for-python/issues/20691_
-
## Getting started
### Install the package
@@ -122,6 +118,10 @@
* [ServiceBusMessage][message_reference]: When sending, this is the type you
will construct to contain your payload. When receiving, this is where you will
access the payload.
+### Thread safety
+
+We do not guarantee that the ServiceBusClient, ServiceBusSender, and
ServiceBusReceiver are thread-safe. We do not recommend reusing these instances
across threads. It is up to the running application to use these classes in a
thread-safe manner.
+
## Examples
The following sections provide several code snippets covering some of the most
common Service Bus tasks, including:
@@ -607,6 +607,12 @@
# Release History
+## 7.11.3 (2023-10-11)
+
+### Bugs Fixed
+
+- Fixed a bug where `prefetch_count` was not being passed through correctly
and caused messages to not be received as expected when in `RECEIVE_AND_DELETE`
mode ([#31712](https://github.com/Azure/azure-sdk-for-python/issues/31712),
[#31711](https://github.com/Azure/azure-sdk-for-python/issues/31711)).
+
## 7.11.2 (2023-09-13)
### Bugs Fixed
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/azure-servicebus-7.11.2/README.md
new/azure-servicebus-7.11.3/README.md
--- old/azure-servicebus-7.11.2/README.md 2023-09-13 21:58:31.000000000
+0200
+++ new/azure-servicebus-7.11.3/README.md 2023-10-11 22:08:58.000000000
+0200
@@ -22,10 +22,6 @@
**NOTE**: If you are using version 0.50 or lower and want to migrate to the
latest version
of this package please look at our [migration guide to move from Service Bus
V0.50 to Service Bus V7][migration_guide].
-## _Disclaimer_
-
-_Azure SDK Python packages support for Python 2.7 ended 01 January 2022. For
more information and questions, please refer to
https://github.com/Azure/azure-sdk-for-python/issues/20691_
-
## Getting started
### Install the package
@@ -97,6 +93,10 @@
* [ServiceBusMessage][message_reference]: When sending, this is the type you
will construct to contain your payload. When receiving, this is where you will
access the payload.
+### Thread safety
+
+We do not guarantee that the ServiceBusClient, ServiceBusSender, and
ServiceBusReceiver are thread-safe. We do not recommend reusing these instances
across threads. It is up to the running application to use these classes in a
thread-safe manner.
+
## Examples
The following sections provide several code snippets covering some of the most
common Service Bus tasks, including:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_common/auto_lock_renewer.py
new/azure-servicebus-7.11.3/azure/servicebus/_common/auto_lock_renewer.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_common/auto_lock_renewer.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_common/auto_lock_renewer.py
2023-10-11 22:08:58.000000000 +0200
@@ -10,7 +10,7 @@
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError as
FuturesTimeoutError
import queue
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Union, Optional
from .._servicebus_receiver import ServiceBusReceiver
from .._servicebus_session import ServiceBusSession
@@ -19,7 +19,7 @@
from .utils import get_renewable_start_time, utc_now,
get_renewable_lock_duration
if TYPE_CHECKING:
- from typing import Callable, Union, Optional
+ from typing import Callable
Renewable = Union[ServiceBusSession, ServiceBusReceivedMessage]
LockRenewFailureCallback = Callable[[Renewable, Optional[Exception]], None]
@@ -69,12 +69,11 @@
def __init__(
self,
- max_lock_renewal_duration=300,
- on_lock_renew_failure=None,
- executor=None,
- max_workers=None,
- ):
- # type: (float, Optional[LockRenewFailureCallback],
Optional[ThreadPoolExecutor], Optional[int]) -> None
+ max_lock_renewal_duration: float = 300,
+ on_lock_renew_failure: Optional["LockRenewFailureCallback"] = None,
+ executor: Optional[ThreadPoolExecutor] = None,
+ max_workers: Optional[int] = None,
+ ) -> None:
"""Auto renew locks for messages and sessions using a background
thread pool. It is recommended
setting max_worker to a large number or passing ThreadPoolExecutor of
large max_workers number when
AutoLockRenewer is supposed to deal with multiple messages or sessions
simultaneously.
@@ -241,12 +240,11 @@
def register(
self,
- receiver,
- renewable,
- max_lock_renewal_duration=None,
- on_lock_renew_failure=None,
- ):
- # type: (ServiceBusReceiver, Renewable, Optional[float],
Optional[LockRenewFailureCallback]) -> None
+ receiver: ServiceBusReceiver,
+ renewable: Union[ServiceBusReceivedMessage, ServiceBusSession],
+ max_lock_renewal_duration: Optional[float] = None,
+ on_lock_renew_failure: Optional["LockRenewFailureCallback"] = None,
+ ) -> None:
"""Register a renewable entity for automatic lock renewal.
:param receiver: The ServiceBusReceiver instance that is associated
with the message or the session to
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_common/receiver_mixins.py
new/azure-servicebus-7.11.3/azure/servicebus/_common/receiver_mixins.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_common/receiver_mixins.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_common/receiver_mixins.py
2023-10-11 22:08:58.000000000 +0200
@@ -50,12 +50,11 @@
self._last_received_sequenced_number = None
self._message_iter = None
self._connection = kwargs.get("connection")
- prefetch_count = kwargs.get("prefetch_count", 0)
- if int(prefetch_count) < 0 or int(prefetch_count) > 50000:
+ self._prefetch_count = int(kwargs.get("prefetch_count", 0))
+ if self._prefetch_count < 0 or self._prefetch_count > 50000:
raise ValueError(
"prefetch_count must be an integer between 0 and 50000
inclusive."
)
- self._prefetch_count = prefetch_count + 1
# The relationship between the amount can be received and the time
interval is linear: amount ~= perf * interval
# In large max_message_count case, like 5000, the pull receive would
always return hundreds of messages limited
# by the perf and time.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_common/utils.py
new/azure-servicebus-7.11.3/azure/servicebus/_common/utils.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_common/utils.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_common/utils.py
2023-10-11 22:08:58.000000000 +0200
@@ -19,7 +19,8 @@
Union,
Tuple,
cast,
- Callable
+ Callable,
+ Iterable,
)
from datetime import timezone
@@ -58,7 +59,9 @@
Mapping[str, Any],
ServiceBusMessage,
AmqpAnnotatedMessage,
- List[Union[Mapping[str, Any], ServiceBusMessage,
AmqpAnnotatedMessage]],
+ Iterable[Mapping[str, Any]],
+ Iterable[ServiceBusMessage],
+ Iterable[AmqpAnnotatedMessage],
]
SingleMessageType = Union[
@@ -239,7 +242,7 @@
:return: A list of ServiceBusMessage or a single ServiceBusMessage
transformed.
:rtype: ~azure.servicebus.ServiceBusMessage or
list[~azure.servicebus.ServiceBusMessage]
"""
- if isinstance(messages, list):
+ if isinstance(messages, Iterable) and not isinstance(messages, Mapping):
return [
_convert_to_single_service_bus_message(m, message_type,
to_outgoing_amqp_message) for m in messages
]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_pyamqp/aio/_client_async.py
new/azure-servicebus-7.11.3/azure/servicebus/_pyamqp/aio/_client_async.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_pyamqp/aio/_client_async.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_pyamqp/aio/_client_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -38,7 +38,6 @@
AMQPException,
MessageException
)
-from ..constants import LinkState
_logger = logging.getLogger(__name__)
@@ -724,7 +723,7 @@
if not self._link:
self._link = self._session.create_receiver_link(
source_address=self.source,
- link_credit=self._link_credit,
+ link_credit=0, # link_credit=0 on flow frame sent before
client is ready
send_settle_mode=self._send_settle_mode,
rcv_settle_mode=self._receive_settle_mode,
max_message_size=self._max_message_size,
@@ -749,7 +748,7 @@
"""
try:
if self._link.current_link_credit == 0:
- await self._link.flow()
+ await self._link.flow(link_credit=self._link_credit)
await self._connection.listen(wait=self._socket_timeout, **kwargs)
except ValueError:
_logger.info("Timeout reached, closing receiver.",
extra=self._network_trace_params)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_pyamqp/aio/_link_async.py
new/azure-servicebus-7.11.3/azure/servicebus/_pyamqp/aio/_link_async.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_pyamqp/aio/_link_async.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_pyamqp/aio/_link_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -65,7 +65,8 @@
capabilities=kwargs.get("target_capabilities"),
)
)
- self.link_credit = kwargs.pop("link_credit", None) or
DEFAULT_LINK_CREDIT
+ link_credit = kwargs.get("link_credit")
+ self.link_credit = link_credit if link_credit is not None else
DEFAULT_LINK_CREDIT
self.current_link_credit = self.link_credit
self.send_settle_mode = kwargs.pop("send_settle_mode",
SenderSettleMode.Mixed)
self.rcv_settle_mode = kwargs.pop("rcv_settle_mode",
ReceiverSettleMode.First)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_pyamqp/client.py
new/azure-servicebus-7.11.3/azure/servicebus/_pyamqp/client.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_pyamqp/client.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_pyamqp/client.py
2023-10-11 22:08:58.000000000 +0200
@@ -827,7 +827,7 @@
if not self._link:
self._link = self._session.create_receiver_link(
source_address=self.source,
- link_credit=self._link_credit,
+ link_credit=0, # link_credit=0 on flow frame sent before
client is ready
send_settle_mode=self._send_settle_mode,
rcv_settle_mode=self._receive_settle_mode,
max_message_size=self._max_message_size,
@@ -852,7 +852,7 @@
"""
try:
if self._link.current_link_credit == 0:
- self._link.flow()
+ self._link.flow(link_credit=self._link_credit)
self._connection.listen(wait=self._socket_timeout, **kwargs)
except ValueError:
_logger.info("Timeout reached, closing receiver.",
extra=self._network_trace_params)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_pyamqp/link.py
new/azure-servicebus-7.11.3/azure/servicebus/_pyamqp/link.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_pyamqp/link.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_pyamqp/link.py
2023-10-11 22:08:58.000000000 +0200
@@ -63,7 +63,8 @@
capabilities=kwargs.get("target_capabilities"),
)
)
- self.link_credit = kwargs.pop("link_credit", None) or
DEFAULT_LINK_CREDIT
+ link_credit = kwargs.get("link_credit")
+ self.link_credit = link_credit if link_credit is not None else
DEFAULT_LINK_CREDIT
self.current_link_credit = self.link_credit
self.send_settle_mode = kwargs.pop("send_settle_mode",
SenderSettleMode.Mixed)
self.rcv_settle_mode = kwargs.pop("rcv_settle_mode",
ReceiverSettleMode.First)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_servicebus_client.py
new/azure-servicebus-7.11.3/azure/servicebus/_servicebus_client.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_servicebus_client.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_servicebus_client.py
2023-10-11 22:08:58.000000000 +0200
@@ -368,8 +368,12 @@
performance but increase the chance that messages will expire while
they are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the
service and processed one at a time.
- In the case of prefetch_count being 0, `ServiceBusReceiver.receive`
would try to cache `max_message_count`
- (if provided) within its request to the service.
+ In the case of prefetch_count being 0,
`ServiceBusReceiver.receive_messages` would try to cache
+ `max_message_count` (if provided) within its request to the service.
+ **WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used,
all prefetched messages will stay in
+ the in-memory prefetch buffer until they're received into the
application. If the application ends before
+ the messages are received into the application, those messages will
be lost and unable to be recovered.
+ Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.
:keyword str client_identifier: A string-based identifier to uniquely
identify the receiver instance.
Service Bus will associate it with some error messages for easier
correlation of errors.
If not specified, a unique id will be generated.
@@ -546,8 +550,12 @@
performance but increase the chance that messages will expire while
they are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the
service and processed one at a time.
- In the case of prefetch_count being 0, `ServiceBusReceiver.receive`
would try to cache `max_message_count`
- (if provided) within its request to the service.
+ In the case of prefetch_count being 0,
`ServiceBusReceiver.receive_messages` would try to cache
+ `max_message_count` (if provided) within its request to the service.
+ **WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used,
all prefetched messages will stay in
+ the in-memory prefetch buffer until they're received into the
application. If the application ends before
+ the messages are received into the application, those messages will
be lost and unable to be recovered.
+ Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.
:keyword str client_identifier: A string-based identifier to uniquely
identify the receiver instance.
Service Bus will associate it with some error messages for easier
correlation of errors.
If not specified, a unique id will be generated.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_servicebus_receiver.py
new/azure-servicebus-7.11.3/azure/servicebus/_servicebus_receiver.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_servicebus_receiver.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_servicebus_receiver.py
2023-10-11 22:08:58.000000000 +0200
@@ -130,8 +130,12 @@
performance but increase the chance that messages will expire while they
are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the
service and processed one at a time.
- In the case of prefetch_count being 0, `ServiceBusReceiver.receive` would
try to cache `max_message_count`
- (if provided) within its request to the service.
+ In the case of prefetch_count being 0,
`ServiceBusReceiver.receive_messages` would try to cache
+ `max_message_count` (if provided) within its request to the service.
+ **WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used, all
prefetched messages will stay in
+ the in-memory prefetch buffer until they're received into the
application. If the application ends before
+ the messages are received into the application, those messages will be
lost and unable to be recovered.
+ Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.
:keyword str client_identifier: A string-based identifier to uniquely
identify the client instance.
Service Bus will associate it with some error messages for easier
correlation of errors.
If not specified, a unique id will be generated.
@@ -237,7 +241,9 @@
def __iter__(self):
return self._iter_contextual_wrapper()
- def _inner_next(self, wait_time=None):
+ def _inner_next(
+ self, wait_time: Optional[float] = None
+ ) -> "ServiceBusReceivedMessage":
# We do this weird wrapping such that an imperitive next() call, and a
generator-based iter both trace sanely.
self._check_live()
while True:
@@ -247,11 +253,11 @@
self._message_iter = None
raise
- def __next__(self):
+ def __next__(self) -> ServiceBusReceivedMessage:
# Normally this would wrap the yield of the iter, but for a direct
next call we just trace imperitively.
try:
self._receive_context.set()
- message = self._inner_next()
+ message: ServiceBusReceivedMessage = self._inner_next()
links = get_receive_links(message)
with receive_trace_context_manager(self, links=links):
return message
@@ -295,8 +301,12 @@
performance but increase the chance that messages will expire while
they are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the
service and processed one at a time.
- In the case of prefetch_count being 0, `ServiceBusReceiver.receive`
would try to cache `max_message_count`
- (if provided) within its request to the service.
+ In the case of prefetch_count being 0,
`ServiceBusReceiver.receive_messages` would try to cache
+ `max_message_count` (if provided) within its request to the service.
+ **WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used,
all prefetched messages will stay in
+ the in-memory prefetch buffer until they're received into the
application. If the application ends before
+ the messages are received into the application, those messages will
be lost and unable to be recovered.
+ Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.
:rtype: ~azure.servicebus.ServiceBusReceiver
:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an
issue in token/identity validity.
@@ -336,22 +346,29 @@
timeout=self._max_wait_time * self._amqp_transport.TIMEOUT_FACTOR
if self._max_wait_time
else 0,
- link_credit=self._prefetch_count,
- # If prefetch is 1, then keep_alive coroutine serves as keep
receiving for releasing messages
+ # set link_credit to at least 1 so that messages can be received
+ link_credit=self._prefetch_count + 1,
+ # If prefetch is "off", then keep_alive coroutine frequently
listens on the connection for messages and
+ # releases right away, since no "prefetched" messages should be in
the internal buffer.
keep_alive_interval=self._config.keep_alive
- if self._prefetch_count != 1
+ if self._prefetch_count != 0
else 5,
shutdown_after_timeout=False,
link_properties={CONSUMER_IDENTIFIER: self._name},
)
- if self._prefetch_count == 1:
+ # When prefetch is 0 and receive mode is PEEK_LOCK, release messages
when they're received.
+ # This will stop messages from expiring in the buffer and incrementing
delivery count of a message.
+ # If RECEIVE_AND_DELETE mode, messages are settled and removed from
the Service Bus entity immediately,
+ # so the regular _message_received callback should be used. This will
ensure that all messages are added
+ # to the internal buffer since they cannot be re-received, even if not
received during an active receive call.
+ if self._prefetch_count == 0 and self._receive_mode ==
ServiceBusReceiveMode.PEEK_LOCK:
# pylint: disable=protected-access
self._handler._message_received = functools.partial(
- self._amqp_transport.enhanced_message_received, # type:
ignore[attr-defined]
+ self._amqp_transport.enhanced_message_received,
self
)
- def _open(self):
+ def _open(self) -> None:
# pylint: disable=protected-access
if self._running:
return
@@ -402,11 +419,11 @@
if len(batch) >= max_message_count:
return [self._build_received_message(message) for message in
batch]
- # Dynamically issue link credit if max_message_count > 1 when the
prefetch_count is the default value 1
+ # Dynamically issue link credit if max_message_count >= 1 when the
prefetch_count is the default value 0
if (
max_message_count
- and self._prefetch_count == 1
- and max_message_count > 1
+ and self._prefetch_count == 0
+ and max_message_count >= 1
):
link_credit_needed = max_message_count - len(batch)
self._amqp_transport.reset_link_credit(amqp_receive_client,
link_credit_needed)
@@ -442,7 +459,6 @@
):
batch.append(received_messages_queue.get())
received_messages_queue.task_done()
-
return [self._build_received_message(message) for message in batch]
finally:
self._receive_context.clear()
@@ -663,7 +679,7 @@
if max_message_count is not None and max_message_count <= 0:
raise ValueError("The max_message_count must be greater than 0")
start_time = time.time_ns()
- messages = self._do_retryable_operation(
+ messages: List[ServiceBusReceivedMessage] =
self._do_retryable_operation(
self._receive,
max_message_count=max_message_count,
timeout=max_wait_time,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_servicebus_sender.py
new/azure-servicebus-7.11.3/azure/servicebus/_servicebus_sender.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_servicebus_sender.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_servicebus_sender.py
2023-10-11 22:08:58.000000000 +0200
@@ -7,7 +7,7 @@
import uuid
import datetime
import warnings
-from typing import Any, TYPE_CHECKING, Union, List, Optional, Mapping, cast
+from typing import Any, TYPE_CHECKING, Union, List, Optional, Mapping, cast,
Iterable
from ._base_handler import BaseHandler
from ._common import mgmt_handlers
@@ -58,7 +58,9 @@
Mapping[str, Any],
ServiceBusMessage,
AmqpAnnotatedMessage,
- List[Union[Mapping[str, Any], ServiceBusMessage,
AmqpAnnotatedMessage]],
+ Iterable[Mapping[str, Any]],
+ Iterable[ServiceBusMessage],
+ Iterable[AmqpAnnotatedMessage],
]
MessageObjTypes = Union[
ServiceBusMessage,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_transport/_base.py
new/azure-servicebus-7.11.3/azure/servicebus/_transport/_base.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_transport/_base.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_transport/_base.py
2023-10-11 22:08:58.000000000 +0200
@@ -238,6 +238,21 @@
@staticmethod
@abstractmethod
+ def enhanced_message_received(*args, **kwargs) -> None: # pylint:
disable=docstring-missing-param,docstring-should-be-keyword
+ """
+ Releases messages from the internal buffer when there is no active
receive call. In PEEKLOCK mode,
+ this helps avoid messages from expiring in the buffer and incrementing
the delivery count of a message.
+
+ Should not be used with RECEIVE_AND_DELETE mode, since those messages
are settled right away and removed
+ from the Service Bus entity.
+
+ :param ~azure.servicebus.ServiceBusReceiver receiver: The receiver
object.
+ :param ~pyamqp.performatives.AttachFrame frame: Required if pyamqp.
+ :param ~uamqp.Message or ~pyamqp.message.Message message: The received
message.
+ """
+
+ @staticmethod
+ @abstractmethod
def build_received_message(receiver, message_type, received):
"""
Build ServiceBusReceivedMessage.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_transport/_pyamqp_transport.py
new/azure-servicebus-7.11.3/azure/servicebus/_transport/_pyamqp_transport.py
---
old/azure-servicebus-7.11.2/azure/servicebus/_transport/_pyamqp_transport.py
2023-09-13 21:58:31.000000000 +0200
+++
new/azure-servicebus-7.11.3/azure/servicebus/_transport/_pyamqp_transport.py
2023-10-11 22:08:58.000000000 +0200
@@ -666,13 +666,19 @@
receiver._receive_context.clear()
@staticmethod
- def enhanced_message_received(
+ def enhanced_message_received( # pylint: disable=arguments-differ
receiver: "ServiceBusReceiver",
frame: "AttachFrame",
message: "Message"
) -> None:
- """
- Receiver enhanced_message_received callback.
+ """Callback run on receipt of every message.
+
+ Releases messages from the internal buffer when there is no active
receive call. In PEEKLOCK mode,
+ this helps avoid messages from expiring in the buffer and incrementing
the delivery count of a message.
+
+ Should not be used with RECEIVE_AND_DELETE mode, since those messages
are settled right away and removed
+ from the Service Bus entity.
+
:param ~azure.servicebus.ServiceBusReceiver receiver: The receiver
object.
:param ~pyamqp.performatives.AttachFrame frame: The attach frame.
:param ~pyamqp.message.Message message: The received message.
@@ -682,6 +688,7 @@
if receiver._receive_context.is_set():
receiver._handler._received_messages.put((frame, message))
else:
+ # If receive_message or receive iterator is not being called,
release message passed to callback.
receiver._handler.settle_messages(frame[1], 'released')
@staticmethod
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/_transport/_uamqp_transport.py
new/azure-servicebus-7.11.3/azure/servicebus/_transport/_uamqp_transport.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_transport/_uamqp_transport.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_transport/_uamqp_transport.py
2023-10-11 22:08:58.000000000 +0200
@@ -767,7 +767,7 @@
# wait_time used by pyamqp
@staticmethod
def iter_next(
- receiver: "ServiceBusReceiver", wait_time: Optional[int] = None
+ receiver: "ServiceBusReceiver", wait_time: Optional[int] = None,
) -> "ServiceBusReceivedMessage": # pylint: disable=unused-argument
# pylint: disable=protected-access
try:
@@ -791,11 +791,16 @@
receiver._receive_context.clear()
@staticmethod
- def enhanced_message_received(
+ def enhanced_message_received( # pylint: disable=arguments-differ
receiver: "ServiceBusReceiver", message: "Message"
) -> None:
"""
- Receiver enhanced_message_received callback.
+ Releases messages from the internal buffer when there is no active
receive call. In PEEKLOCK mode,
+ this helps avoid messages from expiring in the buffer and
incrementing the delivery count of a message.
+
+ Should not be used with RECEIVE_AND_DELETE mode, since those
messages are settled right away and removed
+ from the Service Bus entity.
+
:param ~azure.servicebus.ServiceBusReceiver receiver: The receiver
object.
:param ~uamqp.Message message: The received message.
"""
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/azure-servicebus-7.11.2/azure/servicebus/_version.py
new/azure-servicebus-7.11.3/azure/servicebus/_version.py
--- old/azure-servicebus-7.11.2/azure/servicebus/_version.py 2023-09-13
21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/_version.py 2023-10-11
22:08:58.000000000 +0200
@@ -3,4 +3,4 @@
# Licensed under the MIT License.
# ------------------------------------
-VERSION = "7.11.2"
+VERSION = "7.11.3"
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/aio/_servicebus_client_async.py
new/azure-servicebus-7.11.3/azure/servicebus/aio/_servicebus_client_async.py
---
old/azure-servicebus-7.11.2/azure/servicebus/aio/_servicebus_client_async.py
2023-09-13 21:58:31.000000000 +0200
+++
new/azure-servicebus-7.11.3/azure/servicebus/aio/_servicebus_client_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -353,8 +353,12 @@
performance but increase the chance that messages will expire while
they are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the
service and processed one at a time.
- In the case of prefetch_count being 0, `ServiceBusReceiver.receive`
would try to cache `max_message_count`
- (if provided) within its request to the service.
+ In the case of prefetch_count being 0,
`ServiceBusReceiver.receive_messages` would try to cache
+ `max_message_count` (if provided) within its request to the service.
+ **WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used,
all prefetched messages will stay in
+ the in-memory prefetch buffer until they're received into the
application. If the application ends before
+ the messages are received into the application, those messages will
be lost and unable to be recovered.
+ Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.
:keyword str client_identifier: A string-based identifier to uniquely
identify the receiver instance.
Service Bus will associate it with some error messages for easier
correlation of errors.
If not specified, a unique id will be generated.
@@ -528,8 +532,12 @@
performance but increase the chance that messages will expire while
they are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the
service and processed one at a time.
- In the case of prefetch_count being 0, `ServiceBusReceiver.receive`
would try to cache `max_message_count`
- (if provided) within its request to the service.
+ In the case of prefetch_count being 0,
`ServiceBusReceiver.receive_messages` would try to cache
+ `max_message_count` (if provided) within its request to the service.
+ **WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used,
all prefetched messages will stay in
+ the in-memory prefetch buffer until they're received into the
application. If the application ends before
+ the messages are received into the application, those messages will
be lost and unable to be recovered.
+ Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.
:keyword str client_identifier: A string-based identifier to uniquely
identify the receiver instance.
Service Bus will associate it with some error messages for easier
correlation of errors.
If not specified, a unique id will be generated.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/aio/_servicebus_receiver_async.py
new/azure-servicebus-7.11.3/azure/servicebus/aio/_servicebus_receiver_async.py
---
old/azure-servicebus-7.11.2/azure/servicebus/aio/_servicebus_receiver_async.py
2023-09-13 21:58:31.000000000 +0200
+++
new/azure-servicebus-7.11.3/azure/servicebus/aio/_servicebus_receiver_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -127,8 +127,12 @@
performance but increase the chance that messages will expire while they
are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the
service and processed one at a time.
- In the case of prefetch_count being 0, `ServiceBusReceiver.receive` would
try to cache `max_message_count`
- (if provided) within its request to the service.
+ In the case of prefetch_count being 0,
`ServiceBusReceiver.receive_messages` would try to cache
+ `max_message_count` (if provided) within its request to the service.
+ **WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used, all
prefetched messages will stay in
+ the in-memory prefetch buffer until they're received into the
application. If the application ends before
+ the messages are received into the application, those messages will be
lost and unable to be recovered.
+ Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.
:keyword str client_identifier: A string-based identifier to uniquely
identify the client instance.
Service Bus will associate it with some error messages for easier
correlation of errors. If not specified,
a unique id will be generated.
@@ -234,7 +238,7 @@
def __aiter__(self):
return self._iter_contextual_wrapper()
- async def _inner_anext(self, wait_time=None):
+ async def _inner_anext(self, wait_time: Optional[float] = None) ->
ServiceBusReceivedMessage:
# We do this weird wrapping such that an imperitive next() call, and a
generator-based iter both trace sanely.
self._check_live()
while True:
@@ -244,7 +248,7 @@
self._message_iter = None
raise
- async def __anext__(self):
+ async def __anext__(self) -> ServiceBusReceivedMessage:
try:
self._receive_context.set()
message = await self._inner_anext()
@@ -288,8 +292,12 @@
performance but increase the chance that messages will expire while
they are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the
service and processed one at a time.
- In the case of prefetch_count being 0, `ServiceBusReceiver.receive`
would try to cache `max_message_count`
- (if provided) within its request to the service.
+ In the case of prefetch_count being 0,
`ServiceBusReceiver.receive_messages` would try to cache
+ `max_message_count` (if provided) within its request to the service.
+ **WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used,
all prefetched messages will stay in
+ the in-memory prefetch buffer until they're received into the
application. If the application ends before
+ the messages are received into the application, those messages will
be lost and unable to be recovered.
+ Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.
:rtype: ~azure.servicebus.aio.ServiceBusReceiver
:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an
issue in token/identity validity.
@@ -329,19 +337,26 @@
timeout=self._max_wait_time * self._amqp_transport.TIMEOUT_FACTOR
if self._max_wait_time
else 0,
- link_credit=self._prefetch_count,
- # If prefetch is 1, then keep_alive coroutine serves as keep
receiving for releasing messages
+ # set link_credit to at least 1 so that messages can be received
+ link_credit=self._prefetch_count + 1,
+ # If prefetch is 0, then keep_alive coroutine frequently listens
on the connection for messages and
+ # releases right away, since no "prefetched" messages should be in
the internal buffer.
keep_alive_interval=self._config.keep_alive
- if self._prefetch_count != 1
+ if self._prefetch_count != 0
else 5,
shutdown_after_timeout=False,
link_properties = {CONSUMER_IDENTIFIER:self._name}
)
- if self._prefetch_count == 1:
+ # When prefetch is 0 and receive mode is PEEK_LOCK, release messages
when they're received.
+ # This will stop messages from expiring in the buffer and incrementing
delivery count of a message.
+ # If RECEIVE_AND_DELETE mode, messages are settled and removed from
the Service Bus entity immediately,
+ # so the regular _message_received callback should be used. This will
ensure that all messages are added
+ # to the internal buffer since they cannot be re-received, even if not
received during an active receive call.
+ if self._prefetch_count == 0 and self._receive_mode ==
ServiceBusReceiveMode.PEEK_LOCK:
# pylint: disable=protected-access
self._amqp_transport.set_handler_message_received_async(self)
- async def _open(self):
+ async def _open(self) -> None:
# pylint: disable=protected-access
if self._running:
return
@@ -390,8 +405,8 @@
if len(batch) >= max_message_count:
return [self._build_received_message(message) for message in
batch]
- # Dynamically issue link credit if max_message_count > 1 when the
prefetch_count is the default value 1
- if max_message_count and self._prefetch_count == 1 and
max_message_count > 1:
+ # Dynamically issue link credit if max_message_count >= 1 when the
prefetch_count is the default value 0
+ if max_message_count and self._prefetch_count == 0 and
max_message_count >= 1:
link_credit_needed = max_message_count - len(batch)
await
self._amqp_transport.reset_link_credit_async(amqp_receive_client,
link_credit_needed)
@@ -637,7 +652,7 @@
if max_message_count is not None and max_message_count <= 0:
raise ValueError("The max_message_count must be greater than 0")
start_time = time.time_ns()
- messages = await self._do_retryable_operation(
+ messages: List[ServiceBusReceivedMessage] = await
self._do_retryable_operation(
self._receive,
max_message_count=max_message_count,
timeout=max_wait_time,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/aio/_servicebus_sender_async.py
new/azure-servicebus-7.11.3/azure/servicebus/aio/_servicebus_sender_async.py
---
old/azure-servicebus-7.11.2/azure/servicebus/aio/_servicebus_sender_async.py
2023-09-13 21:58:31.000000000 +0200
+++
new/azure-servicebus-7.11.3/azure/servicebus/aio/_servicebus_sender_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -6,7 +6,7 @@
import asyncio
import datetime
import warnings
-from typing import Any, TYPE_CHECKING, Union, List, Optional, Mapping, cast
+from typing import Any, TYPE_CHECKING, Union, List, Optional, Mapping, cast,
Iterable
from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
@@ -53,7 +53,9 @@
Mapping[str, Any],
ServiceBusMessage,
AmqpAnnotatedMessage,
- List[Union[Mapping[str, Any], ServiceBusMessage, AmqpAnnotatedMessage]],
+ Iterable[Mapping[str, Any]],
+ Iterable[ServiceBusMessage],
+ Iterable[AmqpAnnotatedMessage],
]
MessageObjTypes = Union[
ServiceBusMessage,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/aio/_transport/_pyamqp_transport_async.py
new/azure-servicebus-7.11.3/azure/servicebus/aio/_transport/_pyamqp_transport_async.py
---
old/azure-servicebus-7.11.2/azure/servicebus/aio/_transport/_pyamqp_transport_async.py
2023-09-13 21:58:31.000000000 +0200
+++
new/azure-servicebus-7.11.3/azure/servicebus/aio/_transport/_pyamqp_transport_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -204,8 +204,8 @@
receiver: "ServiceBusReceiver", max_wait_time: Optional[int] = None
) -> AsyncIterator["ServiceBusReceivedMessage"]:
while True:
+ # pylint: disable=protected-access
try:
- # pylint: disable=protected-access
message = await receiver._inner_anext(wait_time=max_wait_time)
links = get_receive_links(message)
with receive_trace_context_manager(receiver, links=links):
@@ -241,6 +241,18 @@
frame: "AttachFrame",
message: "Message"
) -> None:
+ """Callback run on receipt of every message.
+
+ Releases messages from the internal buffer when there is no active
receive call. In PEEKLOCK mode,
+ this helps avoid messages from expiring in the buffer and incrementing
the delivery count of a message.
+
+ Should not be used with RECEIVE_AND_DELETE mode, since those messages
are settled right away and removed
+ from the Service Bus entity.
+
+ :param ~azure.servicebus.aio.ServiceBusReceiver receiver: The receiver
object.
+ :param ~pyamqp.performatives.AttachFrame frame: The attach frame.
+ :param ~pyamqp.message.Message message: The received message.
+ """
# pylint: disable=protected-access
receiver._handler._last_activity_timestamp = time.time()
if receiver._receive_context.is_set():
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure/servicebus/amqp/_amqp_message.py
new/azure-servicebus-7.11.3/azure/servicebus/amqp/_amqp_message.py
--- old/azure-servicebus-7.11.2/azure/servicebus/amqp/_amqp_message.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure/servicebus/amqp/_amqp_message.py
2023-10-11 22:08:58.000000000 +0200
@@ -87,32 +87,32 @@
for more information on the message format.
:keyword data_body: The body consists of one or more data sections and
each section contains opaque binary data.
- :paramtype data_body: Union[str, bytes, List[Union[str, bytes]]]
+ :paramtype data_body: str or bytes or list[str or bytes]
:keyword sequence_body: The body consists of one or more sequence sections
and
each section contains an arbitrary number of structured data elements.
- :paramtype sequence_body: List[Any]
+ :paramtype sequence_body: list[any]
:keyword value_body: The body consists of one amqp-value section and the
section contains a single AMQP value.
- :paramtype value_body: Any
+ :paramtype value_body: any
:keyword header: The amqp message header.
- :paramtype header: Optional[~azure.servicebus.amqp.AmqpMessageHeader]
+ :paramtype header: ~azure.servicebus.amqp.AmqpMessageHeader or
mapping[str, any] or None
:keyword footer: The amqp message footer.
- :paramtype footer: Optional[Dict]
+ :paramtype footer: dict or None
:keyword properties: Properties to add to the amqp message.
- :paramtype properties:
Optional[~azure.servicebus.amqp.AmqpMessageProperties]
+ :paramtype properties: ~azure.servicebus.amqp.AmqpMessageProperties or
mapping[str, any] or None
:keyword application_properties: Service specific application properties.
- :paramtype application_properties: Optional[Dict]
+ :paramtype application_properties: dict or None
:keyword annotations: Service specific message annotations.
- :paramtype annotations: Optional[Dict]
+ :paramtype annotations: dict or None
:keyword delivery_annotations: Service specific delivery annotations.
- :paramtype delivery_annotations: Optional[Dict]
+ :paramtype delivery_annotations: dict or None
"""
def __init__(
self,
*,
- header: Optional["AmqpMessageHeader"] = None,
+ header: Optional[Union["AmqpMessageHeader", Mapping[str, Any]]] = None,
footer: Optional[Dict[str, Any]] = None,
- properties: Optional["AmqpMessageProperties"] = None,
+ properties: Optional[Union["AmqpMessageProperties", Mapping[str,
Any]]] = None,
application_properties: Optional[Dict[str, Any]] = None,
annotations: Optional[Dict[str, Any]] = None,
delivery_annotations: Optional[Dict[str, Any]] = None,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/azure_servicebus.egg-info/PKG-INFO
new/azure-servicebus-7.11.3/azure_servicebus.egg-info/PKG-INFO
--- old/azure-servicebus-7.11.2/azure_servicebus.egg-info/PKG-INFO
2023-09-13 21:59:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/azure_servicebus.egg-info/PKG-INFO
2023-10-11 22:10:08.000000000 +0200
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: azure-servicebus
-Version: 7.11.2
+Version: 7.11.3
Summary: Microsoft Azure Service Bus Client Library for Python
Home-page: https://github.com/Azure/azure-sdk-for-python
Author: Microsoft Corporation
@@ -47,10 +47,6 @@
**NOTE**: If you are using version 0.50 or lower and want to migrate to the
latest version
of this package please look at our [migration guide to move from Service Bus
V0.50 to Service Bus V7][migration_guide].
-## _Disclaimer_
-
-_Azure SDK Python packages support for Python 2.7 ended 01 January 2022. For
more information and questions, please refer to
https://github.com/Azure/azure-sdk-for-python/issues/20691_
-
## Getting started
### Install the package
@@ -122,6 +118,10 @@
* [ServiceBusMessage][message_reference]: When sending, this is the type you
will construct to contain your payload. When receiving, this is where you will
access the payload.
+### Thread safety
+
+We do not guarantee that the ServiceBusClient, ServiceBusSender, and
ServiceBusReceiver are thread-safe. We do not recommend reusing these instances
across threads. It is up to the running application to use these classes in a
thread-safe manner.
+
## Examples
The following sections provide several code snippets covering some of the most
common Service Bus tasks, including:
@@ -607,6 +607,12 @@
# Release History
+## 7.11.3 (2023-10-11)
+
+### Bugs Fixed
+
+- Fixed a bug where `prefetch_count` was not being passed through correctly
and caused messages to not be received as expected when in `RECEIVE_AND_DELETE`
mode ([#31712](https://github.com/Azure/azure-sdk-for-python/issues/31712),
[#31711](https://github.com/Azure/azure-sdk-for-python/issues/31711)).
+
## 7.11.2 (2023-09-13)
### Bugs Fixed
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/azure-servicebus-7.11.2/pyproject.toml
new/azure-servicebus-7.11.3/pyproject.toml
--- old/azure-servicebus-7.11.2/pyproject.toml 2023-09-13 21:58:31.000000000
+0200
+++ new/azure-servicebus-7.11.3/pyproject.toml 2023-10-11 22:08:58.000000000
+0200
@@ -1,5 +1,5 @@
[tool.azure-sdk-build]
pyright = false
-type_check_samples = false
-verifytypes = false
+type_check_samples = true
+verifytypes = true
pylint = true
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/samples/async_samples/authenticate_using_azure_sas_credential_async.py
new/azure-servicebus-7.11.3/samples/async_samples/authenticate_using_azure_sas_credential_async.py
---
old/azure-servicebus-7.11.2/samples/async_samples/authenticate_using_azure_sas_credential_async.py
2023-09-13 21:58:31.000000000 +0200
+++
new/azure-servicebus-7.11.3/samples/async_samples/authenticate_using_azure_sas_credential_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -14,10 +14,8 @@
import hashlib
import base64
import asyncio
-try:
- from urllib.parse import quote as url_parse_quote
-except ImportError:
- from urllib import pathname2url as url_parse_quote
+from urllib.parse import quote as url_parse_quote
+
from azure.core.credentials import AzureSasCredential
from azure.servicebus import ServiceBusMessage
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/samples/async_samples/auto_lock_renew_async.py
new/azure-servicebus-7.11.3/samples/async_samples/auto_lock_renew_async.py
--- old/azure-servicebus-7.11.2/samples/async_samples/auto_lock_renew_async.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/samples/async_samples/auto_lock_renew_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -9,6 +9,10 @@
Example to show usage of AutoLockRenewer asynchronously:
1. Automatically renew locks on messages received from non-sessionful
entity
2. Automatically renew locks on the session of sessionful entity
+
+We do not guarantee that this SDK is thread-safe. We do not recommend reusing
the ServiceBusClient,
+ ServiceBusSender, ServiceBusReceiver across threads. It is up to the running
+ application to use these classes in a thread-safe manner.
"""
import os
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/samples/async_samples/receive_deferred_message_queue_async.py
new/azure-servicebus-7.11.3/samples/async_samples/receive_deferred_message_queue_async.py
---
old/azure-servicebus-7.11.2/samples/async_samples/receive_deferred_message_queue_async.py
2023-09-13 21:58:31.000000000 +0200
+++
new/azure-servicebus-7.11.3/samples/async_samples/receive_deferred_message_queue_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -34,7 +34,8 @@
deferred_sequenced_numbers = []
for msg in received_msgs:
print("Deferring msg: {}".format(str(msg)))
- deferred_sequenced_numbers.append(msg.sequence_number)
+ if msg.sequence_number:
+ deferred_sequenced_numbers.append(msg.sequence_number)
await receiver.defer_message(msg)
if deferred_sequenced_numbers:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/samples/async_samples/sample_code_servicebus_async.py
new/azure-servicebus-7.11.3/samples/async_samples/sample_code_servicebus_async.py
---
old/azure-servicebus-7.11.2/samples/async_samples/sample_code_servicebus_async.py
2023-09-13 21:58:31.000000000 +0200
+++
new/azure-servicebus-7.11.3/samples/async_samples/sample_code_servicebus_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -160,8 +160,8 @@
servicebus_sender = await example_create_servicebus_sender_async()
# [START send_async]
async with servicebus_sender:
- message = ServiceBusMessage("Hello World")
- await servicebus_sender.send_messages(message)
+ message_send = ServiceBusMessage("Hello World")
+ await servicebus_sender.send_messages(message_send)
# [END send_async]
await servicebus_sender.send_messages([ServiceBusMessage("Hello
World")] * 5)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/samples/async_samples/send_and_receive_amqp_annotated_message_async.py
new/azure-servicebus-7.11.3/samples/async_samples/send_and_receive_amqp_annotated_message_async.py
---
old/azure-servicebus-7.11.2/samples/async_samples/send_and_receive_amqp_annotated_message_async.py
2023-09-13 21:58:31.000000000 +0200
+++
new/azure-servicebus-7.11.3/samples/async_samples/send_and_receive_amqp_annotated_message_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -11,7 +11,7 @@
import os
import asyncio
-from azure.servicebus.amqp import AmqpAnnotatedMessage, AmqpMessageBodyType
+from azure.servicebus.amqp import AmqpAnnotatedMessage, AmqpMessageBodyType,
AmqpMessageProperties, AmqpMessageHeader
from azure.servicebus.aio import ServiceBusClient
CONNECTION_STR = os.environ['SERVICEBUS_CONNECTION_STR']
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/samples/sync_samples/authenticate_using_azure_sas_credential.py
new/azure-servicebus-7.11.3/samples/sync_samples/authenticate_using_azure_sas_credential.py
---
old/azure-servicebus-7.11.2/samples/sync_samples/authenticate_using_azure_sas_credential.py
2023-09-13 21:58:31.000000000 +0200
+++
new/azure-servicebus-7.11.3/samples/sync_samples/authenticate_using_azure_sas_credential.py
2023-10-11 22:08:58.000000000 +0200
@@ -13,10 +13,8 @@
import hmac
import hashlib
import base64
-try:
- from urllib.parse import quote as url_parse_quote
-except ImportError:
- from urllib import pathname2url as url_parse_quote
+from urllib.parse import quote as url_parse_quote
+
from azure.core.credentials import AzureSasCredential
from azure.servicebus import ServiceBusClient, ServiceBusMessage
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/samples/sync_samples/auto_lock_renew.py
new/azure-servicebus-7.11.3/samples/sync_samples/auto_lock_renew.py
--- old/azure-servicebus-7.11.2/samples/sync_samples/auto_lock_renew.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/samples/sync_samples/auto_lock_renew.py
2023-10-11 22:08:58.000000000 +0200
@@ -9,6 +9,10 @@
Example to show usage of AutoLockRenewer:
1. Automatically renew locks on messages received from non-sessionful
entity
2. Automatically renew locks on the session of sessionful entity
+
+We do not guarantee that this SDK is thread-safe. We do not recommend reusing
the ServiceBusClient,
+ ServiceBusSender, ServiceBusReceiver across threads. It is up to the running
+ application to use these classes in a thread-safe manner.
"""
import os
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/samples/sync_samples/generate_sas_token_and_authenticate_client.py
new/azure-servicebus-7.11.3/samples/sync_samples/generate_sas_token_and_authenticate_client.py
---
old/azure-servicebus-7.11.2/samples/sync_samples/generate_sas_token_and_authenticate_client.py
2023-09-13 21:58:31.000000000 +0200
+++
new/azure-servicebus-7.11.3/samples/sync_samples/generate_sas_token_and_authenticate_client.py
2023-10-11 22:08:58.000000000 +0200
@@ -13,10 +13,7 @@
import hmac
import hashlib
import base64
-try:
- from urllib.parse import quote as url_parse_quote
-except ImportError:
- from urllib import pathname2url as url_parse_quote
+from urllib.parse import quote as url_parse_quote
from azure.core.credentials import AccessToken
from azure.servicebus import ServiceBusClient
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/samples/sync_samples/receive_deferred_message_queue.py
new/azure-servicebus-7.11.3/samples/sync_samples/receive_deferred_message_queue.py
---
old/azure-servicebus-7.11.2/samples/sync_samples/receive_deferred_message_queue.py
2023-09-13 21:58:31.000000000 +0200
+++
new/azure-servicebus-7.11.3/samples/sync_samples/receive_deferred_message_queue.py
2023-10-11 22:08:58.000000000 +0200
@@ -10,6 +10,7 @@
"""
import os
+from typing import List
from azure.servicebus import ServiceBusMessage, ServiceBusClient
CONNECTION_STR = os.environ['SERVICEBUS_CONNECTION_STR']
@@ -29,7 +30,8 @@
deferred_sequenced_numbers = []
for msg in received_msgs:
print("Deferring msg: {}".format(str(msg)))
- deferred_sequenced_numbers.append(msg.sequence_number)
+ if msg.sequence_number:
+ deferred_sequenced_numbers.append(msg.sequence_number)
receiver.defer_message(msg)
if deferred_sequenced_numbers:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/samples/sync_samples/sample_code_servicebus.py
new/azure-servicebus-7.11.3/samples/sync_samples/sample_code_servicebus.py
--- old/azure-servicebus-7.11.2/samples/sync_samples/sample_code_servicebus.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/samples/sync_samples/sample_code_servicebus.py
2023-10-11 22:08:58.000000000 +0200
@@ -154,8 +154,8 @@
servicebus_sender = example_create_servicebus_sender_sync()
# [START send_sync]
with servicebus_sender:
- message = ServiceBusMessage("Hello World")
- servicebus_sender.send_messages(message)
+ message_send = ServiceBusMessage("Hello World")
+ servicebus_sender.send_messages(message_send)
# [END send_sync]
servicebus_sender.send_messages([ServiceBusMessage("Hello World")] * 5)
@@ -167,7 +167,7 @@
# [END create_batch_sync]
# [START send_complex_message]
- message = ServiceBusMessage(
+ message_send = ServiceBusMessage(
"Hello World!!",
session_id="MySessionID",
application_properties={'data': 'custom_data'},
@@ -179,8 +179,8 @@
servicebus_receiver = example_create_servicebus_receiver_sync()
# [START peek_messages_sync]
with servicebus_receiver:
- messages = servicebus_receiver.peek_messages()
- for message in messages:
+ messages_peek = servicebus_receiver.peek_messages()
+ for message in messages_peek:
print(str(message))
# [END peek_messages_sync]
@@ -200,15 +200,17 @@
servicebus_receiver = example_create_servicebus_receiver_sync()
# [START receive_sync]
with servicebus_receiver:
- messages = servicebus_receiver.receive_messages(max_wait_time=5)
- for message in messages:
+ messages_sync = servicebus_receiver.receive_messages(max_wait_time=5)
+ for message in messages_sync:
print(str(message))
servicebus_receiver.complete_message(message)
# [END receive_sync]
# [START receive_complex_message]
- messages = servicebus_receiver.receive_messages(max_wait_time=5)
- for message in messages:
+ from typing import List
+ from azure.servicebus import ServiceBusReceivedMessage
+ messages_complex: List[ServiceBusReceivedMessage] =
servicebus_receiver.receive_messages(max_wait_time=5)
+ for message in messages_complex:
print("Receiving: {}".format(message))
print("Time to live: {}".format(message.time_to_live))
print("Sequence number: {}".format(message.sequence_number))
@@ -223,32 +225,32 @@
# [END receive_complex_message]
# [START abandon_message_sync]
- messages = servicebus_receiver.receive_messages(max_wait_time=5)
- for message in messages:
+ messages_abandon =
servicebus_receiver.receive_messages(max_wait_time=5)
+ for message in messages_abandon:
servicebus_receiver.abandon_message(message)
# [END abandon_message_sync]
# [START complete_message_sync]
- messages = servicebus_receiver.receive_messages(max_wait_time=5)
- for message in messages:
+ messages_complete =
servicebus_receiver.receive_messages(max_wait_time=5)
+ for message in messages_complete:
servicebus_receiver.complete_message(message)
# [END complete_message_sync]
# [START defer_message_sync]
- messages = servicebus_receiver.receive_messages(max_wait_time=5)
- for message in messages:
+ messages_defer = servicebus_receiver.receive_messages(max_wait_time=5)
+ for message in messages_defer:
servicebus_receiver.defer_message(message)
# [END defer_message_sync]
# [START dead_letter_message_sync]
- messages = servicebus_receiver.receive_messages(max_wait_time=5)
- for message in messages:
+ messages_dead_letter =
servicebus_receiver.receive_messages(max_wait_time=5)
+ for message in messages_dead_letter:
servicebus_receiver.dead_letter_message(message)
# [END dead_letter_message_sync]
# [START renew_message_lock_sync]
- messages = servicebus_receiver.receive_messages(max_wait_time=5)
- for message in messages:
+ messages_lock = servicebus_receiver.receive_messages(max_wait_time=5)
+ for message in messages_lock:
servicebus_receiver.renew_message_lock(message)
# [END renew_message_lock_sync]
@@ -270,8 +272,8 @@
# [START receive_defer_sync]
with servicebus_receiver:
deferred_sequenced_numbers = []
- messages = servicebus_receiver.receive_messages(max_wait_time=5)
- for message in messages:
+ messages_defer = servicebus_receiver.receive_messages(max_wait_time=5)
+ for message in messages_defer:
deferred_sequenced_numbers.append(message.sequence_number)
print(str(message))
servicebus_receiver.defer_message(message)
@@ -304,8 +306,8 @@
)
with servicebus_client.get_queue_receiver(queue_name,
sub_queue=ServiceBusSubQueue.DEAD_LETTER) as servicebus_deadletter_receiver:
- messages =
servicebus_deadletter_receiver.receive_messages(max_wait_time=5)
- for message in messages:
+ messages_deadletter =
servicebus_deadletter_receiver.receive_messages(max_wait_time=5)
+ for message in messages_deadletter:
servicebus_deadletter_receiver.complete_message(message)
# [END receive_deadletter_sync]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/samples/sync_samples/session_pool_receive.py
new/azure-servicebus-7.11.3/samples/sync_samples/session_pool_receive.py
--- old/azure-servicebus-7.11.2/samples/sync_samples/session_pool_receive.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/samples/sync_samples/session_pool_receive.py
2023-10-11 22:08:58.000000000 +0200
@@ -7,6 +7,7 @@
import os
import uuid
import concurrent
+from typing import List
from azure.servicebus import ServiceBusClient, ServiceBusMessage,
AutoLockRenewer, NEXT_AVAILABLE_SESSION
from azure.servicebus.exceptions import OperationTimeoutError
@@ -56,7 +57,7 @@
message = ServiceBusMessage("Sample message no.
{}".format(i), session_id=session_id)
sender.send_messages(message)
- all_messages = []
+ all_messages: List = []
futures = []
with
concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_receivers) as
thread_pool:
for _ in range(concurrent_receivers):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/tests/async_tests/test_queues_async.py
new/azure-servicebus-7.11.3/tests/async_tests/test_queues_async.py
--- old/azure-servicebus-7.11.2/tests/async_tests/test_queues_async.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/tests/async_tests/test_queues_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -509,7 +509,74 @@
messages.append(message)
assert len(messages) == 0
-
+ @pytest.mark.asyncio
+ @pytest.mark.liveTest
+ @pytest.mark.live_test_only
+ @CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
+ @CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
+ @ServiceBusQueuePreparer(name_prefix='servicebustest',
dead_lettering_on_message_expiration=True, lock_duration='PT10S')
+ @pytest.mark.parametrize("uamqp_transport", uamqp_transport_params,
ids=uamqp_transport_ids)
+ @ArgPasserAsync()
+ async def
test_async_queue_by_queue_client_conn_str_receive_handler_receiveanddelete_prefetch(self,
uamqp_transport, *, servicebus_namespace_connection_string=None,
servicebus_queue=None, **kwargs):
+ async with ServiceBusClient.from_connection_string(
+ servicebus_namespace_connection_string, logging_enable=False,
uamqp_transport=uamqp_transport) as sb_client:
+
+ # send 10 messages
+ async with sb_client.get_queue_sender(servicebus_queue.name) as
sender:
+ for i in range(10):
+ message = ServiceBusMessage("Handler message no.
{}".format(i))
+ await sender.send_messages(message)
+
+ # check peek_messages returns correctly, with default
prefetch_count = 0
+ messages = []
+ async with sb_client.get_queue_receiver(servicebus_queue.name,
+
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE,
+ max_wait_time=10) as receiver:
+ # peek messages checks current state of queue, which should
return 10
+ # since none were prefetched, added to internal buffer, and
deleted on receive
+ peeked_msgs = await
receiver.peek_messages(max_message_count=10, timeout=10)
+ assert len(peeked_msgs) == 10
+
+ # iterator receives and deletes each message from SB queue
+ async for msg in receiver:
+ messages.append(msg)
+ assert len(messages) == 10
+
+ # queue should be empty now
+ peeked_msgs = await
receiver.peek_messages(max_message_count=10, timeout=10)
+ assert len(peeked_msgs) == 0
+
+ # send 10 messages
+ async with sb_client.get_queue_sender(servicebus_queue.name) as
sender:
+ for i in range(10):
+ message = ServiceBusMessage("Handler message no.
{}".format(i))
+ await sender.send_messages(message)
+
+ # check peek_messages returns correctly, with default
prefetch_count > 0
+ messages = []
+ # prefetch 1 message from SB queue when receive is called and not
on open
+ async with sb_client.get_queue_receiver(servicebus_queue.name,
+
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE,
+ prefetch_count=1,
+ max_wait_time=30) as receiver:
+ # peek messages checks current state of SB queue, and returns
10
+ peeked_msgs = await
receiver.peek_messages(max_message_count=10, timeout=10)
+ assert len(peeked_msgs) == 10
+
+ # receive 3 messages
+ recvd_msgs = await
receiver.receive_messages(max_message_count=3, max_wait_time=10)
+ assert len(recvd_msgs) == 3
+
+ # receive rest of messages in queue
+ async for msg in receiver:
+ messages.append(msg)
+ assert len(messages) == 7
+
+ # queue should be empty now
+ peeked_msgs = await
receiver.peek_messages(max_message_count=10, timeout=10)
+ assert len(peeked_msgs) == 0
+
+
@pytest.mark.asyncio
@pytest.mark.liveTest
@pytest.mark.live_test_only
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/tests/async_tests/test_sessions_async.py
new/azure-servicebus-7.11.3/tests/async_tests/test_sessions_async.py
--- old/azure-servicebus-7.11.2/tests/async_tests/test_sessions_async.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/tests/async_tests/test_sessions_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -629,6 +629,9 @@
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params,
ids=uamqp_transport_ids)
@ArgPasserAsync()
async def
test_async_session_by_conn_str_receive_handler_with_auto_autolockrenew(self,
uamqp_transport, *, servicebus_namespace_connection_string=None,
servicebus_queue=None, **kwargs):
+ if sys.platform.startswith('darwin'):
+ pytest.skip("Skipping for flakiness on OSX. Need to fix and unskip
during MQ. Issue created: #32067.")
+
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False,
uamqp_transport=uamqp_transport) as sb_client:
session_id = str(uuid.uuid4())
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/azure-servicebus-7.11.2/tests/async_tests/test_subscriptions_async.py
new/azure-servicebus-7.11.3/tests/async_tests/test_subscriptions_async.py
--- old/azure-servicebus-7.11.2/tests/async_tests/test_subscriptions_async.py
2023-09-13 21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/tests/async_tests/test_subscriptions_async.py
2023-10-11 22:08:58.000000000 +0200
@@ -7,6 +7,7 @@
import logging
import sys
import os
+import asyncio
import pytest
import time
from datetime import datetime, timedelta
@@ -231,3 +232,44 @@
assert len(messages) == 1
assert messages[0].delivery_count > 0
await receiver.complete_message(messages[0])
+
+ @pytest.mark.asyncio
+ @pytest.mark.liveTest
+ @pytest.mark.live_test_only
+ @CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
+ @CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
+ @ServiceBusTopicPreparer(name_prefix='servicebustest')
+ @ServiceBusSubscriptionPreparer(name_prefix='servicebustest',
lock_duration='PT5S')
+ @pytest.mark.parametrize("uamqp_transport", uamqp_transport_params,
ids=uamqp_transport_ids)
+ @ArgPasserAsync()
+ async def test_subscription_receive_and_delete_with_send_and_wait(self,
uamqp_transport, *, servicebus_namespace=None,
servicebus_namespace_key_name=None, servicebus_namespace_primary_key=None,
servicebus_topic=None, servicebus_subscription=None, **kwargs):
+ fully_qualified_namespace =
f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
+ async with ServiceBusClient(
+ fully_qualified_namespace=fully_qualified_namespace,
+ credential=ServiceBusSharedKeyCredential(
+ policy=servicebus_namespace_key_name,
+ key=servicebus_namespace_primary_key
+ ),
+ logging_enable=False,
+ uamqp_transport=uamqp_transport
+ ) as sb_client:
+
+ sender =
sb_client.get_topic_sender(topic_name=servicebus_topic.name)
+ receiver = sb_client.get_subscription_receiver(
+ topic_name=servicebus_topic.name,
+ subscription_name=servicebus_subscription.name,
+ receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE,
+ )
+ async with sender, receiver:
+ # queue should be empty
+ received_msgs = await
receiver.receive_messages(max_message_count=10, max_wait_time=10)
+ assert len(received_msgs) == 0
+
+ messages = [ServiceBusMessage("Message") for _ in range(10)]
+ await sender.send_messages(messages)
+ # wait for all messages to be sent to queue
+ await asyncio.sleep(10)
+
+ # receive messages + add to internal buffer should have
messages now
+ received_msgs = await
receiver.receive_messages(max_message_count=10, max_wait_time=10)
+ assert len(received_msgs) == 10
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/azure-servicebus-7.11.2/tests/test_queues.py
new/azure-servicebus-7.11.3/tests/test_queues.py
--- old/azure-servicebus-7.11.2/tests/test_queues.py 2023-09-13
21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/tests/test_queues.py 2023-10-11
22:08:58.000000000 +0200
@@ -521,6 +521,7 @@
else:
with pytest.raises(TypeError):
pickled = pickle.loads(pickle.dumps(messages[0]))
+
@pytest.mark.liveTest
@pytest.mark.live_test_only
@@ -573,8 +574,74 @@
for message in receiver:
messages.append(message)
assert len(messages) == 0
-
-
+
+ @pytest.mark.liveTest
+ @pytest.mark.live_test_only
+ @CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
+ @CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
+ @ServiceBusQueuePreparer(name_prefix='servicebustest',
dead_lettering_on_message_expiration=True, lock_duration='PT10S')
+ @pytest.mark.parametrize("uamqp_transport", uamqp_transport_params,
ids=uamqp_transport_ids)
+ @ArgPasser()
+ def
test_queue_by_queue_client_conn_str_receive_handler_receiveanddelete_prefetch(self,
uamqp_transport, *, servicebus_namespace_connection_string=None,
servicebus_queue=None, **kwargs):
+ with ServiceBusClient.from_connection_string(
+ servicebus_namespace_connection_string, logging_enable=False,
uamqp_transport=uamqp_transport) as sb_client:
+ # send 10 messages
+ with sb_client.get_queue_sender(servicebus_queue.name) as sender:
+ for i in range(10):
+ message = ServiceBusMessage("Handler message no.
{}".format(i))
+ sender.send_messages(message)
+
+ # check peek_messages returns correctly, with default
prefetch_count = 0
+ messages = []
+ with sb_client.get_queue_receiver(servicebus_queue.name,
+
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE,
+ max_wait_time=10) as receiver:
+ # peek messages checks current state of queue, which should
return 10
+ # since none were prefetched, added to internal queue, and
deleted
+ peeked_msgs = receiver.peek_messages(max_message_count=10,
timeout=10)
+ assert len(peeked_msgs) == 10
+
+ # iterator receives and deletes each message from SB queue
+ for msg in receiver:
+ messages.append(msg)
+ assert len(messages) == 10
+
+ # queue should be empty now
+ peeked_msgs = receiver.peek_messages(max_message_count=10,
timeout=10)
+ assert len(peeked_msgs) == 0
+
+ # send 10 messages
+ with sb_client.get_queue_sender(servicebus_queue.name) as sender:
+ for i in range(10):
+ message = ServiceBusMessage("Handler message no.
{}".format(i))
+ sender.send_messages(message)
+
+ # check peek_messages returns correctly, with default
prefetch_count > 0
+ messages = []
+ # prefetch 2 messages from SB queue when receive is called and not
on open
+ with sb_client.get_queue_receiver(
+ servicebus_queue.name,
+ receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE,
+ prefetch_count=2,
+ max_wait_time=30
+ ) as receiver:
+ # peek messages checks current state of SB queue, and returns
10
+ peeked_msgs = receiver.peek_messages(max_message_count=10,
timeout=10)
+ assert len(peeked_msgs) == 10
+
+ # receive 3 messages
+ recvd_msgs = receiver.receive_messages(max_message_count=3,
max_wait_time=10)
+ assert len(recvd_msgs) == 3
+
+ # receive rest of messages in queue
+ for msg in receiver:
+ messages.append(msg)
+ assert len(messages) == 7
+
+ # queue should be empty now
+ peeked_msgs = receiver.peek_messages(max_message_count=10,
timeout=10)
+ assert len(peeked_msgs) == 0
+
@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/azure-servicebus-7.11.2/tests/test_subscriptions.py
new/azure-servicebus-7.11.3/tests/test_subscriptions.py
--- old/azure-servicebus-7.11.2/tests/test_subscriptions.py 2023-09-13
21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/tests/test_subscriptions.py 2023-10-11
22:08:58.000000000 +0200
@@ -253,3 +253,44 @@
assert len(messages) == 1
assert messages[0].delivery_count > 0
receiver.complete_message(messages[0])
+
+ @pytest.mark.liveTest
+ @pytest.mark.live_test_only
+ @CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
+ @CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
+ @ServiceBusTopicPreparer(name_prefix='servicebustest')
+ @ServiceBusSubscriptionPreparer(name_prefix='servicebustest',
lock_duration='PT5S')
+ @pytest.mark.parametrize("uamqp_transport", uamqp_transport_params,
ids=uamqp_transport_ids)
+ @ArgPasser()
+ def test_subscription_receive_and_delete_with_send_and_wait(self,
uamqp_transport, *, servicebus_namespace=None,
servicebus_namespace_key_name=None, servicebus_namespace_primary_key=None,
servicebus_topic=None, servicebus_subscription=None, **kwargs):
+ fully_qualified_namespace =
f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
+ with ServiceBusClient(
+ fully_qualified_namespace=fully_qualified_namespace,
+ credential=ServiceBusSharedKeyCredential(
+ policy=servicebus_namespace_key_name,
+ key=servicebus_namespace_primary_key
+ ),
+ logging_enable=False,
+ uamqp_transport=uamqp_transport
+ ) as sb_client:
+
+ sender =
sb_client.get_topic_sender(topic_name=servicebus_topic.name)
+ receiver = sb_client.get_subscription_receiver(
+ topic_name=servicebus_topic.name,
+ subscription_name=servicebus_subscription.name,
+ receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE,
+ )
+ with sender, receiver:
+ # queue should be empty
+ received_msgs =
receiver.receive_messages(max_message_count=10, max_wait_time=10)
+ assert len(received_msgs) == 0
+
+ messages = [ServiceBusMessage("Message") for _ in range(10)]
+ sender.send_messages(messages)
+ # wait for all messages to be sent to queue
+ time.sleep(10)
+
+ # receive messages + add to internal buffer should have
messages now
+ received_msgs =
receiver.receive_messages(max_message_count=10, max_wait_time=10)
+ assert len(received_msgs) == 10
+
\ No newline at end of file
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/azure-servicebus-7.11.2/tests/utilities.py
new/azure-servicebus-7.11.3/tests/utilities.py
--- old/azure-servicebus-7.11.2/tests/utilities.py 2023-09-13
21:58:31.000000000 +0200
+++ new/azure-servicebus-7.11.3/tests/utilities.py 2023-10-11
22:08:58.000000000 +0200
@@ -15,7 +15,7 @@
uamqp_available = False
from azure.servicebus._common.utils import utc_now
-# TODO: temporary - disable uamqp if China b/c of 8+ hr runtime
+# temporary - disable uamqp if China b/c of 8+ hr runtime
uamqp_available = uamqp_available and
os.environ.get('SERVICEBUS_ENDPOINT_SUFFIX') != '.servicebus.chinacloudapi.cn'