Sorry, I hope I am not beating a dead horse here, but I thought it might be 
helpful to report the results of my investigation.

First of all, recall that my use case is the listener could be patient for long 
periods of time but then become busy on a long-running task as the result of a 
message received -- much longer than TTL in both cases.

Now consider this code, which I used in my investigation:

-------
# Message Consumer/Worker that monitors the "AAA" queue.
from json import dumps
import loggers
from stomp import (Connection12,
                   ConnectionListener)
import sys
import time


logger = loggers.newlogger(__name__)


# Listener that consumes messages requesting AAA processing.
class AAAHeartbeatListener(ConnectionListener):
    def __init__(self,
                 conn):
        self.conn = conn

    def on_connected(self,
                     frame):
        # Called by the STOMP connection when a CONNECTED frame is received 
(after a connection has been established or re-established).
        # Parameters: frame (Frame) - the stomp frame
        logger.info("on_connected: " + str(frame))

    def on_connecting(self,
                      host_and_port):
        # Called by the STOMP connection once a TCP/IP connection to the STOMP 
server has been established or re-established. Note that at this point, no 
connection has been established on the STOMP protocol level. For this, you need 
to invoke the “connect" method on the connection.
        # Parameters: host_and_port ((str,int)) - a tuple containing the host 
name and port number to which the connection has been established.
        logger.info("on_connecting: " + str(host_and_port))

    def on_disconnected(self):
        # Called by the STOMP connection when a TCP/IP connection to the STOMP 
server has been lost. No messages should be sent via the connection until it 
has been reestablished.
        logger.info("on_disconnected")

    def on_disconnecting(self):
        # Called before a DISCONNECT frame is sent.
        logger.info("on_disconnecting")

    def on_error(self,
                 frame):
        # Called by the STOMP connection when an ERROR frame is received.
        # Parameters: frame (Frame) - the stomp frame
        logger.error("on_error: " + str(frame))

    def on_heartbeat(self):
        # Called on receipt of a heartbeat.
        logger.info("on_heartbeat")

    def on_heartbeat_timeout(self):
        # Called by the STOMP connection when a heartbeat message has not been 
received beyond the specified period.
        logger.info("on_heartbeat_timeout")

    def on_message(self,
                   frame):
        # Called by the STOMP connection when a MESSAGE frame is received.
        # Parameters: frame (Frame) - the stomp frame
        # message is a stomp.utils.Frame with:
        #     message.cmd == "MESSAGE"
        #     message.headers == dict()
        #     message.body == str
        logger.debug("on_message: " + str(frame))

        try:
            message_id = frame.headers["message-id"]
            subscription = int(frame.headers["subscription"])

            logger.info("Processing message with ID:" + str(message_id) + " and 
Subscription:" + str(subscription))
            # Simulate a long-running process -- much longer than TTL.
            for _ in range(0, 1200):
                # 100ms sleep intervals (should) keep the client from blocking.
                time.sleep(0.1)
            logger.info("Processed message with ID:" + str(message_id) + " and 
Subscription:" + str(subscription))
            self.conn.ack(message_id,
                          subscription)
            logger.info("Message ACKed")
        except:
            logger.error(loggers.format_exc_info("Failed to process message: " 
+ str(frame)))
            self.conn.nack(message_id,
                           subscription)
            logger.info("Message NACKed")

    def on_receipt(self,
                   frame):
        # Called by the STOMP connection when a RECEIPT frame is received, sent 
by the server if requested by the client using the ‘receipt’ header.
        # Parameters: frame (Frame) - the stomp frame
        logger.debug("on_receipt: " + str(frame))

    def on_receiver_loop_completed(self,
                                   frame):
        # Called when the connection receiver_loop has finished.
        logger.debug("on_receiver_loop_completed: " + str(frame))

    def on_send(self,
                frame):
        # Called by the STOMP connection when it is in the process of sending a 
message
        logger.debug("on_send: " + str(frame))


if __name__ == "__main__":
    host="localhost"
    port=61613
    host_and_ports = [(host, port)]
    from_client_hb_millis = 30000
    from_server_hb_millis = 10000
    heartbeats = (from_client_hb_millis,
                  from_server_hb_millis)

    logger.info("Listening to the AAA ActiveMQ queue on " +
                dumps(host_and_ports))

    # Connect and consume from the queue.
    conn = Connection12(host_and_ports=host_and_ports,
                        heartbeats=heartbeats)
    listener = AAAHeartbeatListener(conn)
    conn.set_listener("",
                      listener)
    conn.connect("admin",
                 "admin",
                 wait=True,
                 headers = {"client-id": sys.argv[0]})

    # Stuff a few messages onto the queue.
    for _ in range(0,3):
        conn.send("AAA",
                  dumps({"message": "How's yer mom en em?"}),
                  "application/json",
                  headers={"destination-type": "ANYCAST",
                           "persistent": "true"})

    # We set the ActiveMQ prefetchSize to 1 so that only one message
    # is dispatched at a time.
    conn.subscribe("AAA",
                   42,
                   ack="client-individual",
                   headers={"subscription-type": "ANYCAST",
                            "consumer-window-size": 0})

    # Keep the process alive until the connection terminates.
    try:
        while conn.is_connected():
            time.sleep(30)
    except:
        # Assuming a keyboard interrupt.
        pass
    finally:
        conn.disconnect()

    logger.info("I am dead.")
-------

If I run this as is I get:

-------
/usr/bin/python3.6 /pub/dev/flotsam/run_heartbeat_test.py
2022-06-24 09:23:06,187 __main__ INFO:
Listening to the AAA ActiveMQ queue on [["localhost", 61613]]

2022-06-24 09:23:06,310 __main__ INFO:
on_connecting: ('localhost', 61613)

2022-06-24 09:23:06,315 __main__ INFO:
on_connected: {cmd=CONNECTED,headers=[{'version': '1.2', 'session': '672525f2', 
'server': 'ActiveMQ-Artemis/2.22.0 ActiveMQ Artemis Messaging Engine', 
'heart-beat': '10000,30000'}],body=}

2022-06-24 09:23:06,523 __main__ INFO:
Processing message with ID:10205 and Subscription:42

Heartbeat timeout: diff_receive=15.000172974017914, time=264585.48758635, 
lastrec=264570.487413376
2022-06-24 09:23:36,316 __main__ INFO:
on_disconnected

2022-06-24 09:23:36,316 __main__ INFO:
on_heartbeat_timeout

2022-06-24 09:23:36,335 __main__ INFO:
I am dead.


Process finished with exit code 0
-------

Note that the client does not process the server heartbeat while it is busy 
with the message, as if it's processing server heartbeats on the same thread as 
the one processing the long-running message.

On the other hand, if I set from_server_hb_millis = 10000 I get the behavior I 
would expect -- the messages are processed, even though they take longer than 
TTL to complete -- and the listener stays alive until I kill it, as if it's 
sending heartbeats in another thread.

Now for my purposes the latter scenario (not requesting server HBs) is all I 
need, but I just thought you might want to know about this with the hope that 
it's helpful to someone else down the line.

Thank you!

Regards,

Rich Bergmann

________________________________
From: Richard Bergmann <rbergm...@colsa.com.INVALID>
Sent: Thursday, June 23, 2022 11:13 AM
To: users@activemq.apache.org <users@activemq.apache.org>
Subject: Re: [External] - Re: How to avoid AMQ229014 TTL "dump" for a "patient" 
Python 3 Stomp Listener

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.


Thank you Justin.  It's just now becoming apparent to me that the heartbeating 
mechanism is built into the client connection, i.e., I don't have to do 
anything on the on-heartbeat event.  Doh!

>From what I've read I need to create a Connection12 to make all this go.  So 
>now I will proceed to try to get all this working.

Thank you for your help and your patience.

Regards,

Rich Bergmann
________________________________
From: Justin Bertram <jbert...@apache.org>
Sent: Thursday, June 23, 2022 11:01 AM
To: users@activemq.apache.org <users@activemq.apache.org>
Subject: Re: [External] - Re: How to avoid AMQ229014 TTL "dump" for a "patient" 
Python 3 Stomp Listener

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.


Heart-beating is something that your client implementation should provide
for you automatically. You shouldn't need to send your own heart-beats
manually. The client should also provide a way to configure the
heart-beating on the initial connection so that it can send the proper
header on the CONNECT frame. Does your Python STOMP client support
heart-beating? The fact that you're not receiving any heart-beats indicates
that either your client doesn't support it or you're not configuring it
properly.


Justin


On Thu, Jun 23, 2022 at 9:22 AM Richard Bergmann
<rbergm...@colsa.com.invalid> wrote:

> I am set up to receive the heartbeat event on-heartbeat, but it is never
> fired.
>
> Heartbeat makes sense, and I did a bit of digging on that, but the "STOMP
> heart-beating and connection-ttl" section within
> https://usg02.safelinks.protection.office365.us/?url=https%3A%2F%2Factivemq.apache.org%2Fcomponents%2Fartemis%2Fdocumentation%2Flatest%2Fstomp.html&amp;data=05%7C01%7CRBERGMANN%40colsa.com%7C0a0b185627bd4f83c98108da552aeb00%7C9821086b78824b43a5edb1e979bee31f%7C1%7C0%7C637915940097190160%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&amp;sdata=jo7gJZ9EewF%2B4E1Y37zrXnxMa2Tyfjf4XiMl3E27Bqs%3D&amp;reserved=0
> (which  think I read maybe 50 times over the last two days?) basically sez
>
> (1) you need STOMP 1.1 or 1.2 (1.0 is the default)
> (2) you need to send a heartbeat header
> (3) you need to respond with a heartbeat
>
> But nowhere is does is specify WHAT a heartbeat message is!!!!  Yes (and
> just know), massive Googling presents me with no examples of HOW to do this.
>
> As I am using a basic STOMP Python connection (presumably 1.0), I am HAPPY
> to send unsolicited heartbeats to the broker every, say, 30 seconds, but
> I'm left wanting for just WHAT that heartbeat call is!!!!!
>
> Sorry, kinda frustrated here . . .
> ________________________________
> From: Justin Bertram <jbert...@apache.org>
> Sent: Thursday, June 23, 2022 9:50 AM
> To: users@activemq.apache.org <users@activemq.apache.org>
> Subject: [External] - Re: How to avoid AMQ229014 TTL "dump" for a
> "patient" Python 3 Stomp Listener
>
> CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you recognize the sender and know
> the content is safe.
>
>
> > Is there a more acceptable way to alert the broker that the listener is
> still alive and well but just being patient?
>
> Yes. STOMP already has this via heartbeating [1]. Does your Python client
> support heartbeating? If so, are you configuring it?
>
>
> Justin
>
> [1]
> https://usg02.safelinks.protection.office365.us/?url=https%3A%2F%2Fstomp.github.io%2Fstomp-specification-1.2.html%23Heart-beating&amp;data=05%7C01%7CRBERGMANN%40colsa.com%7C0a0b185627bd4f83c98108da552aeb00%7C9821086b78824b43a5edb1e979bee31f%7C1%7C0%7C637915940097190160%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&amp;sdata=%2Bgtqye2ftC1BjMEfzUyNaFsUG1RxNShu%2B2BGdUNWIlg%3D&amp;reserved=0
>
> On Thu, Jun 23, 2022 at 7:56 AM Richard Bergmann
> <rbergm...@colsa.com.invalid> wrote:
>
> > I migrated my application from Classic to Artemis, and now my listeners
> > are being summarily disconnected by the broker on a TTL timeout.
> >
> > My use case is that I have listeners (subscribers) that are launched as a
> > service and patiently wait (could be for days!!!!) for messages to arrive
> > on a work queue.  Furthermore, the work to be done can take perhaps up to
> > an hour . . . well past the default TTL timeout of 60 seconds.  This was
> > not an issue in Classic, but certainly is in Artemis.
> >
> > My solution was to bump the STOMP acceptor timeout to something
> > stratospheric (e.g., 10,000,000,000), but this just feels . . . icky!
> >
> > I've seen a number of threads recommend doing a "connect and subscribe"
> > loop in the on-disconnect event, but I have a bad feeling about this if
> the
> > listener is working on a message that is either NACKed on the disconnect,
> > or the ACK fails because the work was completed between the disconnect
> and
> > the reconnect.  In other words, this also feels icky!
> >
> > Is there a more acceptable way to alert the broker that the listener is
> > still alive and well but just being patient?  For example, is there a
> NOOP
> > call that can be made to the broker every, say, 30 seconds, to say "I'm
> > still here!  Please don't kill me!!!"?  Since the listener may be in the
> > throes of a long-running process, I can see having the listener spawn a
> > thread that sends the "I'm alive!" message every 30 seconds, then,
> > terminate the thread on a successful shutdown.  That way the broker can
> > kill legitimately "dead" listeners by keeping the reasonable 60 second
> > timeout, but not assassinate my legitimate patient listeners.
> >
> > Thank you!  :)
> > ________________________________
> > The information contained in this e-mail and any attachments from COLSA
> > Corporation may contain company sensitive and/or proprietary information,
> > and is intended only for the named recipient to whom it was originally
> > addressed. If you are not the intended recipient, any disclosure,
> > distribution, or copying of this e-mail or its attachments is strictly
> > prohibited. If you have received this e-mail in error, please notify the
> > sender immediately by return e-mail and permanently delete the e-mail and
> > any attachments.
> >
> >
> > COLSA Proprietary
> >
> ________________________________
> The information contained in this e-mail and any attachments from COLSA
> Corporation may contain company sensitive and/or proprietary information,
> and is intended only for the named recipient to whom it was originally
> addressed. If you are not the intended recipient, any disclosure,
> distribution, or copying of this e-mail or its attachments is strictly
> prohibited. If you have received this e-mail in error, please notify the
> sender immediately by return e-mail and permanently delete the e-mail and
> any attachments.
>
>
> COLSA Proprietary
>
________________________________
The information contained in this e-mail and any attachments from COLSA 
Corporation may contain company sensitive and/or proprietary information, and 
is intended only for the named recipient to whom it was originally addressed. 
If you are not the intended recipient, any disclosure, distribution, or copying 
of this e-mail or its attachments is strictly prohibited. If you have received 
this e-mail in error, please notify the sender immediately by return e-mail and 
permanently delete the e-mail and any attachments.


COLSA Proprietary

COLSA Proprietary
________________________________
The information contained in this e-mail and any attachments from COLSA 
Corporation may contain company sensitive and/or proprietary information, and 
is intended only for the named recipient to whom it was originally addressed. 
If you are not the intended recipient, any disclosure, distribution, or copying 
of this e-mail or its attachments is strictly prohibited. If you have received 
this e-mail in error, please notify the sender immediately by return e-mail and 
permanently delete the e-mail and any attachments.


COLSA Proprietary

Reply via email to